diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 779d037a93046a8d663abc35ef56173726f3ee85..19623a68697ae59e8a8378fdf2b0415ea487364b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1604,10 +1604,11 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg) { +SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); pRspObj->resType = RES_TYPE__TMQ; + (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); @@ -1626,8 +1627,8 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg) for(int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); int64_t rows = htobe64(pRetrieve->numOfRows); - pRspObj->resInfo.totalRows += rows; pVg->numOfRows += rows; + (*numOfRows) += rows; } return pRspObj; @@ -1839,11 +1840,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } else { // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg); + int64_t numOfRows = 0; + SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pRsp->resInfo.totalRows, pollRspWrapper->reqId); + tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pollRspWrapper->reqId); - tmq->totalRows += pRsp->resInfo.totalRows; + tmq->totalRows += numOfRows; taosFreeQitem(pollRspWrapper); return pRsp; } @@ -1895,12 +1897,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp void* pRsp = NULL; + int64_t numOfRows = 0; if (pollRspWrapper->taosxRsp.createTableNum == 0) { - pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg); + pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); } + tmq->totalRows += numOfRows; + char buf[80]; tFormatOffset(buf, 80, &pVg->currentOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId,