diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9f24deff94adf5da052d70987e8b9bc926d3faf0..6a9f88ce150d69a4880556b9635df3e65cd07441 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1289,9 +1289,11 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req offset:%" PRId64 ", rsp offset:%" PRId64 " type %d, reqId:0x%"PRIx64, - tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, - rspType, requestId); + tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%" PRId64 + " type %d, reqId:0x%" PRIx64, + tmq->consumerId, pParam->vgId, pRspWrapper->dataRsp.reqOffset.version, + pRspWrapper->dataRsp.rspOffset.version, rspType, requestId); + } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); @@ -1378,6 +1380,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgNumGet; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);