提交 9eb15e41 编写于 作者: wmmhello's avatar wmmhello

fix:put poll to push manager if wal not exist when offset is latest

上级 5eb1c559
......@@ -87,12 +87,13 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
return 0;
}
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest) {
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode);
*pBlockReturned = false;
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
if (pOffset != NULL) {
*pOffsetVal = pOffset->val;
......@@ -120,12 +121,27 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
STqOffset offset = {0};
strcpy(offset.subKey, pRequest->subKey);
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
*pBlockReturned = true;
return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer);
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
*pBlockReturned = true;
return code;
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed",
......@@ -154,8 +170,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST || (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId)) {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
// lock
taosWLockLatch(&pTq->lock);
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
......@@ -296,11 +311,16 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// 1. reset the offset if needed
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice.
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest);
bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
}
// empty block returned, quit
if (blockReturned) {
return 0;
}
} else { // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册