diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index d88a26cbb2ef5ef31c03ffa7d7a8c0b58499adb8..6f978b0143b8f30de1d83f2db3e0d404c6aec44d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1251,7 +1251,8 @@ TEST(clientCase, td_25129) { } for(int i = 0; i < numOfAssign; i++){ - printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("assign i:%d, vgId:%d, committed:%lld, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, committed, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); } while (1) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 6bd23c3b902494d9b5f791bf3542401dc34caf52..85054e5cd7302a883047a402227b285d20f0463c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -692,6 +692,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { taosArrayDestroy(rebOutput.modifyConsumers); taosArrayDestroy(rebOutput.rebVgs); + taosHashCancelIterate(pReq->rebSubHash, pIter); terrno = TSDB_CODE_OUT_OF_MEMORY; mInfo("mq re-balance failed, due to out of memory"); taosHashCleanup(pReq->rebSubHash); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index db9ef7805faebe17394857de9b83d0e9cc4f22da..824024b1d09f4cb23e87ceceb94c7b4a4843c1fb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -336,10 +336,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t STqOffset* pOffset = &vgOffset.offset; if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) { - tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, + tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts); } else if (pOffset->val.type == TMQ_OFFSET__LOG) { - tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, + tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, pOffset->val.version); } else { tqError("invalid commit offset type:%d", pOffset->val.type); @@ -367,12 +367,12 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { SRpcMsg rsp = {.info = pMsg->info}; int code = 0; - tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; } + tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 81f87a3e22ad6e3118cf8c883ec22753bc3ddd1c..bf1abf579517ac28169e85e3e2f002d726901b19 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -624,6 +624,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); + case TDMT_VND_TMQ_VG_COMMITTEDINFO: + return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg); + case TDMT_VND_TMQ_SEEK: + return tqProcessSeekReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;