diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 45bf69c4501805c5b32623f5d3346370a8bd9a68..50d1e78c895a88b16878f7c56f6237f28c16de27 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,6 +24,8 @@ #include "tref.h" #include "ttimer.h" +#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 + struct SMqMgmt { int8_t inited; tmr_h timer; @@ -68,18 +70,16 @@ struct tmq_conf_t { }; struct tmq_t { - int64_t refId; - // conf - char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; - int8_t withTbName; - int8_t useSnapshot; - int8_t autoCommit; - int32_t autoCommitInterval; - int32_t resetOffsetCfg; - uint64_t consumerId; - bool hbBgEnable; - + int64_t refId; + char groupId[TSDB_CGROUP_LEN]; + char clientId[256]; + int8_t withTbName; + int8_t useSnapshot; + int8_t autoCommit; + int32_t autoCommitInterval; + int32_t resetOffsetCfg; + uint64_t consumerId; + bool hbBgEnable; tmq_commit_cb* commitCb; void* commitCbUserParam; @@ -93,11 +93,10 @@ struct tmq_t { int64_t pollCnt; // timer - tmr_h hbLiveTimer; - tmr_h epTimer; - tmr_h reportTimer; - tmr_h commitTimer; - + tmr_h hbLiveTimer; + tmr_h epTimer; + tmr_h reportTimer; + tmr_h commitTimer; STscObj* pTscObj; // connection SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of rsp @@ -149,6 +148,7 @@ typedef struct { SMqClientVg* vgHandle; SMqClientTopic* topicHandle; uint64_t reqId; + SEpSet* pEpset; union { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; @@ -403,65 +403,42 @@ static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t return NULL; } +// Two problems do not need to be addressed here +// 1. update to of epset. the response of poll request will automatically handle this problem +// 2. commit failure. This one needs to be resolved. static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again - taosThreadMutexLock(&pParam->pTmq->lock); - int32_t numOfVgroups, index; - SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups); - - if (pVg == NULL) { - tscDebug("consumer:0x%" PRIx64 - " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d", - pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups); - } else { // let's retry the commit - int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups); - if (code1 != TSDB_CODE_SUCCESS) { // retry failed. - tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 - " retry failed, ignore this commit. code:%s ordinal:%d/%d", - pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version, - tstrerror(terrno), index + 1, numOfVgroups); - } - } - - taosThreadMutexUnlock(&pParam->pTmq->lock); - - taosMemoryFree(pParam->pOffset); - taosMemoryFree(pBuf->pData); - taosMemoryFree(pBuf->pEpSet); - - tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); - return 0; - } - - // todo replace the pTmq with refId - taosThreadMutexLock(&pParam->pTmq->lock); - tmq_t* pTmq = pParam->pTmq; - int32_t index = 0, numOfVgroups = 0; - - SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups); - if (pVg == NULL) { - tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d", - pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups); - } else { // update the epset if needed - if (pBuf->pEpSet != NULL) { - SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); - SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet)); - - tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d", - pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port, - index + 1, numOfVgroups); - - pVg->epSet = *pBuf->pEpSet; - } - - tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId, - pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups); - } - - taosThreadMutexUnlock(&pParam->pTmq->lock); +// if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again +// taosThreadMutexLock(&pParam->pTmq->lock); +// int32_t numOfVgroups, index; +// SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups); +// if (pVg == NULL) { +// tscDebug("consumer:0x%" PRIx64 +// " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d", +// pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups); +// } else { // let's retry the commit +// int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups); +// if (code1 != TSDB_CODE_SUCCESS) { // retry failed. +// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 +// " retry failed, ignore this commit. code:%s ordinal:%d/%d", +// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version, +// tstrerror(terrno), index + 1, numOfVgroups); +// } +// } +// +// taosThreadMutexUnlock(&pParam->pTmq->lock); +// +// taosMemoryFree(pParam->pOffset); +// taosMemoryFree(pBuf->pData); +// taosMemoryFree(pBuf->pEpSet); +// +// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId); +// return 0; +// } +// +// // todo replace the pTmq with refId taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); @@ -892,15 +869,21 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosMemoryFreeClear(pRsp->pEpset); + taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree); taosArrayDestroy(pRsp->dataRsp.blockDataLen); taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree); taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosMemoryFreeClear(pRsp->pEpset); + taosMemoryFree(pRsp->metaRsp.metaRsp); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; + taosMemoryFreeClear(pRsp->pEpset); + taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree); taosArrayDestroy(pRsp->taosxRsp.blockDataLen); taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree); @@ -1321,7 +1304,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; + pRspWrapper->pEpset = pMsg->pEpSet; + pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); @@ -1350,7 +1335,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); taosWriteQitem(tmq->mqueue, pRspWrapper); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, @@ -1516,6 +1500,14 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (head->epoch <= epoch) { tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", tmq->consumerId, head->epoch, epoch); + if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) { + SMqAskEpRsp rsp; + tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); + int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY; + atomic_store_8(&tmq->status, flag); + tDeleteSMqAskEpRsp(&rsp); + } + goto END; } @@ -1702,13 +1694,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < 100) { // less than 100ms + if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } - int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, @@ -1731,6 +1723,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } } + tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId); return 0; } @@ -1782,31 +1775,43 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { + SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; + + if (pDataRsp->head.epoch == consumerEpoch) { + // todo fix it: race condition SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; + + // update the epset + if (pollRspWrapper->pEpset != NULL) { + SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset); + SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet)); + tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, + pVg->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); + pVg->epSet = *pollRspWrapper->pEpset; + } + + pVg->currentOffset = pDataRsp->rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - if (pollRspWrapper->dataRsp.blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, - pollRspWrapper->reqId); + char buf[80]; + tFormatOffset(buf, 80, &pDataRsp->rspOffset); + if (pDataRsp->blockNum == 0) { + tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId, + pVg->vgId, buf, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); rspWrapper = NULL; continue; - } + } else { // build rsp + SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pollRspWrapper->reqId); - // build rsp - char buf[80]; - tFormatOffset(buf, 80, &pVg->currentOffset); - SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); - tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, - pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId); - - taosFreeQitem(pollRspWrapper); - return pRsp; + taosFreeQitem(pollRspWrapper); + return pRsp; + } } else { tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + tmq->consumerId, pDataRsp->head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 53421aa45cf86e340c24341d366c259cb5f15610..2f560910d430048a653aa0209c1aeb1e6e3a56bb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -101,7 +101,7 @@ void mndRebCntDec() { int32_t newVal = val - 1; int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); if (oldVal == val) { - mInfo("rebalance trans end, rebalance counter:%d", newVal); + mDebug("rebalance trans end, rebalance counter:%d", newVal); break; } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d3ba7ad6087397c265dd1d3bf0d8b8880230e391..718b5979a156dae1cb043d2dc1a27fca2801fbb3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -510,7 +510,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || - pMsg->msgType == TDMT_VND_BATCH_META) && + pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0;