From 42422b7a659eb0684ab491a924da9ee9e41974f4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Oct 2022 21:20:35 +0800 Subject: [PATCH] fix(taosx): reset latest offset --- source/dnode/vnode/src/tq/tq.c | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 42a597a305..1b4f67cc71 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -516,17 +516,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); - - tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, - pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { - code = -1; + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + + tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); + tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, + pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); + if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + code = -1; + } + tDeleteSMqDataRsp(&dataRsp); + return code; + } else { + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pReq); + tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); + if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { + code = -1; + } + tDeleteSTaosxRsp(&taosxRsp); + return code; } - tDeleteSMqDataRsp(&dataRsp); - return code; } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 " in vg %d, subkey %s, reset none failed", -- GitLab