diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 807e6cb53b7c4e80c5bbb71bba46a10e3e2b95a9..50a42d547c20efe7b795489b4a0caf4a70d0b75d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2276,7 +2276,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { return pRspObj->rsp.reqOffset.version; } } else{ - tscError("invalid tmqtype:%d", *(int8_t*)res); + tscError("invalid tmq type:%d", *(int8_t*)res); } // data from tsdb, no valid offset info diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 7768f71c61063f7c63aa36142840baaef801481d..8e9f043f625c8b2b6f69ca633a3bc04825721f0a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -171,6 +171,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); + dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -191,7 +192,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } taosWUnLockLatch(&pTq->lock); } - setRequestVersion(pOffset, pOffset->version); + setRequestVersion(&dataRsp.reqOffset, pOffset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); end : {