提交 f55f8d24 编写于 作者: H Haojun Liao

fix(tmq): avoid the offset is revised by pool rsp which is issued before...

fix(tmq): avoid the offset is revised by pool rsp which is issued before invoking tmq_offset_seek function.
上级 3896e148
...@@ -147,8 +147,9 @@ typedef struct { ...@@ -147,8 +147,9 @@ typedef struct {
int32_t vgId; int32_t vgId;
int32_t vgStatus; int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups int32_t vgSkipCnt; // here used to mark the slow vgroups
bool receiveInfo; bool receivedInfoFromVnode;// has already received info from vnode
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
SEpSet epSet; SEpSet epSet;
} SMqClientVg; } SMqClientVg;
...@@ -1704,6 +1705,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p ...@@ -1704,6 +1705,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
pVg->seekUpdated = false; // reset this flag.
pTmq->pollCnt++; pTmq->pollCnt++;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1806,8 +1808,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1806,8 +1808,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values. // update the local offset value only for the returned values, only when the local offset is NOT updated
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
}
// update the status // update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
...@@ -1815,7 +1820,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1815,7 +1820,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// update the valid wal version range // update the valid wal version range
pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver; pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
pVg->receiveInfo = true; pVg->receivedInfoFromVnode = true;
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &pDataRsp->rspOffset); tFormatOffset(buf, 80, &pDataRsp->rspOffset);
...@@ -2433,7 +2438,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2433,7 +2438,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for (int32_t j = 0; j < (*numOfAssignment); ++j) { for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (!pClientVg->receiveInfo) { if (!pClientVg->receivedInfoFromVnode) {
needFetch = true; needFetch = true;
break; break;
} }
...@@ -2623,6 +2628,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2623,6 +2628,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
pOffsetInfo->currentOffset.version = offset; pOffsetInfo->currentOffset.version = offset;
pOffsetInfo->committedOffset.version = INT64_MIN; pOffsetInfo->committedOffset.version = INT64_MIN;
pVg->seekUpdated = true;
} }
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
......
...@@ -389,6 +389,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) ...@@ -389,6 +389,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
return -1; return -1;
} }
walReaderSeekVer(pHandle->execHandle.pTqReader->pWalReader, vgOffset.offset.val.version);
tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
vgOffset.consumerId, vgOffset.offset.val.version); vgOffset.consumerId, vgOffset.offset.val.version);
......
...@@ -255,18 +255,22 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -255,18 +255,22 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (metaRsp.metaRspLen > 0) { if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp); taosMemoryFree(metaRsp.metaRsp);
goto end; goto end;
} }
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) { if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end; goto end;
}else { } else {
*offset = taosxRsp.rspOffset; *offset = taosxRsp.rspOffset;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册