diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 7883f198a8594f04ef9bab47ed6a44aafadcd17e..fc9f7540f7ea683b61c6c8fdd135008ed79e1101 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -91,7 +91,9 @@ struct tmq_t { int8_t epStatus; int32_t epSkipCnt; #endif - int64_t pollCnt; + // poll info + int64_t pollCnt; + int64_t totalRows; // timer tmr_h hbLiveTimer; @@ -127,6 +129,7 @@ enum { typedef struct { int64_t pollCnt; + int64_t numOfRows; STqOffsetVal committedOffset; STqOffsetVal currentOffset; int32_t vgId; @@ -629,8 +632,7 @@ FAIL: return 0; } -static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, - void* userParam) { +static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { int32_t code = -1; SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); @@ -734,6 +736,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); } + taosReleaseRef(tmqMgmt.rsetId, refId); } void tmqAssignAskEpTask(void* param, void* tmrId) { @@ -757,6 +760,8 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); } + + taosReleaseRef(tmqMgmt.rsetId, refId); taosMemoryFree(param); } @@ -770,6 +775,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { void tmqSendHbReq(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { taosMemoryFree(param); @@ -783,17 +789,19 @@ void tmqSendHbReq(void* param, void* tmrId) { int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { tscError("tSerializeSMqHbReq failed"); - return; + goto OVER; } + void* pReq = taosMemoryCalloc(1, tlen); if (tlen < 0) { tscError("failed to malloc MqHbReq msg, size:%d", tlen); - return; + goto OVER; } + if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) { tscError("tSerializeSMqHbReq %d failed", tlen); taosMemoryFree(pReq); - return; + goto OVER; } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -801,6 +809,7 @@ void tmqSendHbReq(void* param, void* tmrId) { taosMemoryFree(pReq); goto OVER; } + sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, @@ -820,6 +829,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer); + taosReleaseRef(tmqMgmt.rsetId, refId); } int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { @@ -960,8 +970,15 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { return rsp; } +static void freeClientVgImpl(void* param) { + SMqClientTopic* pTopic = param; + taosMemoryFreeClear(pTopic->schema.pSchema); + taosArrayDestroy(pTopic->vgs); +} + void tmqFreeImpl(void* handle) { - tmq_t* tmq = (tmq_t*)handle; + tmq_t* tmq = (tmq_t*)handle; + int64_t id = tmq->consumerId; // TODO stop timer if (tmq->mqueue) { @@ -977,16 +994,11 @@ void tmqFreeImpl(void* handle) { tsem_destroy(&tmq->rspSem); taosThreadMutexDestroy(&tmq->lock); - int32_t sz = taosArrayGetSize(tmq->clientTopics); - for (int32_t i = 0; i < sz; i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - taosMemoryFreeClear(pTopic->schema.pSchema); - taosArrayDestroy(pTopic->vgs); - } - - taosArrayDestroy(tmq->clientTopics); + taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl); taos_close_internal(tmq->pTscObj); taosMemoryFree(tmq); + + tscDebug("consumer:0x%" PRIx64 " closed", id); } static void tmqMgmtInit(void) { @@ -1086,8 +1098,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { char buf[80] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; tFormatOffset(buf, tListLen(buf), &offset); - tscInfo("consumer:0x%" PRIx64 " is setup, groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d", - pTmq->consumerId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf, + tscInfo("consumer:0x%" PRIx64 " is setup, refId:%"PRId64", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d", + pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf, pTmq->hbBgEnable); return pTmq; @@ -1228,10 +1240,12 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; + + int64_t refId = pParam->refId; SMqClientVg* pVg = pParam->pVg; SMqClientTopic* pTopic = pParam->pTopic; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); @@ -1282,6 +1296,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId); tsem_post(&tmq->rspSem); + taosReleaseRef(tmqMgmt.rsetId, refId); + taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); return 0; @@ -1344,6 +1360,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId); tsem_post(&tmq->rspSem); + taosReleaseRef(tmqMgmt.rsetId, refId); + return 0; CREATE_MSG_FAIL: @@ -1352,6 +1370,8 @@ CREATE_MSG_FAIL: } tsem_post(&tmq->rspSem); + taosReleaseRef(tmqMgmt.rsetId, refId); + return -1; } @@ -1389,6 +1409,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, + .numOfRows = 0, }; taosArrayPush(pTopic->vgs, &clientVg); @@ -1540,6 +1561,8 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { } END: + taosReleaseRef(tmqMgmt.rsetId, pParam->refId); + if (!async) { tsem_post(&pParam->rspSem); } else { @@ -1579,7 +1602,7 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { +SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); pRspObj->resType = RES_TYPE__TMQ; @@ -1597,6 +1620,14 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } + // extract the rows in this data packet + for(int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); + int64_t rows = htobe64(pRetrieve->numOfRows); + pRspObj->resInfo.totalRows += rows; + pVg->numOfRows += rows; + } + return pRspObj; } @@ -1808,10 +1839,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { rspWrapper = NULL; continue; } else { // build rsp - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); - tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pollRspWrapper->reqId); + SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg); + tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"PRId64" reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pRsp->resInfo.totalRows, pollRspWrapper->reqId); + tmq->totalRows += pRsp->resInfo.totalRows; taosFreeQitem(pollRspWrapper); return pRsp; } @@ -1864,12 +1896,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp void* pRsp = NULL; if (pollRspWrapper->taosxRsp.createTableNum == 0) { - pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg); } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); } - char buf[80]; tFormatOffset(buf, 80, &pVg->currentOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId, @@ -1957,9 +1988,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ - /*", left time %" PRId64,*/ - /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/ tsem_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck @@ -1975,6 +2003,24 @@ int32_t tmq_consumer_close(tmq_t* tmq) { return rsp; } + int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); + tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", + tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId); + for (int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i); + + tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i); + int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); + for (int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); + tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + } + } + + tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId); + int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); while (1) { @@ -2177,7 +2223,9 @@ int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { } int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId); + int64_t refId = pParamSet->refId; + + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { if (!pParamSet->async) { tsem_destroy(&pParamSet->rspSem); @@ -2205,6 +2253,8 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); #endif + + taosReleaseRef(tmqMgmt.rsetId, refId); return 0; }