diff --git a/include/common/tmisce.h b/include/common/tmisce.h index b9f5cf5b91f55a8f9ca14194b41673c8506bb3fa..bc6558900c2f370c1e29ed09d430587c7bcf531d 100644 --- a/include/common/tmisce.h +++ b/include/common/tmisce.h @@ -27,13 +27,14 @@ typedef struct SCorEpSet { SEpSet epSet; } SCorEpSet; +#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse])) int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); -bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2); - -void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet); -SEpSet getEpSet_s(SCorEpSet* pEpSet); +bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2); +void epsetAssign(SEpSet* dst, const SEpSet* pSrc); +void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet); +SEpSet getEpSet_s(SCorEpSet* pEpSet); #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ba13195b9755bf4797553c2f29763f491439ca58..335c57afd36af2ce7a46a89e3a4cfdc479c9af76 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1830,7 +1830,7 @@ typedef struct { } SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; typedef struct { - uint64_t consumerId; + int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; char clientId[256]; SArray* topicNames; // SArray diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fb8cdcfde375c3f83411d6be55b0761f0c4168ed..1e98f62f36de555796d6b5581e64273eb4998530 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -58,15 +58,14 @@ struct tmq_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[TSDB_CGROUP_LEN]; - int8_t autoCommit; - int8_t resetOffset; - int8_t withTbName; - int8_t snapEnable; - int32_t snapBatchSize; - bool hbBgEnable; - + char clientId[256]; + char groupId[TSDB_CGROUP_LEN]; + int8_t autoCommit; + int8_t resetOffset; + int8_t withTbName; + int8_t snapEnable; + int32_t snapBatchSize; + bool hbBgEnable; uint16_t port; int32_t autoCommitInterval; char* ip; @@ -213,6 +212,7 @@ typedef struct { typedef struct { SMqCommitCbParamSet* params; STqOffset* pOffset; + SMqClientVg* pMqVg; /*char topicName[TSDB_TOPIC_FNAME_LEN];*/ /*int32_t vgId;*/ } SMqCommitCbParam; @@ -440,6 +440,17 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } #endif + // there may be race condition. fix it + if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) { + SMqClientVg* pMqVg = pParam->pMqVg; + + SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); + SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet)); + uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId, + pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); + pParam->pMqVg->epSet = *pBuf->pEpSet; + } + taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -448,7 +459,6 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { * pOffset->version);*/ tmqCommitRspCountDown(pParamSet); - return 0; } @@ -498,6 +508,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pParam->params = pParamSet; pParam->pOffset = pOffset; + pParam->pMqVg = pVg; // there may be an race condition // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -518,7 +529,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port); - // TODO: put into cb + // TODO: put into cb, the commit offset should be move to the callback function pVg->committedOffset = pVg->currentOffset; pMsgSendInfo->requestId = generateRequestId(); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index dfb1917fcfbeddf5814a6689bba8864be44c180e..59afce1bbbf342ac61392c8e26dd44628c9505de 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -60,6 +60,19 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) { return true; } +void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) { + if (pSrc == NULL || pDst == NULL) { + return; + } + + pDst->inUse = pSrc->inUse; + pDst->numOfEps = pSrc->numOfEps; + for (int32_t i = 0; i < pSrc->numOfEps; ++i) { + pDst->eps[i].port = pSrc->eps[i].port; + tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn)); + } +} + void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) { taosCorBeginWrite(&pEpSet->version); pEpSet->epSet = *pNewEpSet; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 40feb22906dea74ae66c659a76c952d4e74b20a9..d167c9c09ade88c6e2692bae07858a9591e3ac88 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -413,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); - // 2. check epoch, only send ep info when epoches do not match + // 2. check epoch, only send ep info when epochs do not match if (epoch != serverEpoch) { taosRLockLatch(&pConsumer->lock); mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7e1aeafaad8bb25a88ba248de727deda7932f4df..8faaf041083e1668bc1e7aa7c38d09bd2aef7671 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2303,6 +2303,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; + pCtx->origEpSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType;