提交 37fea8cf 编写于 作者: H Haojun Liao

fix(tmq): update the epset when the leader of vnode changed.

上级 9a5b7ce2
...@@ -27,13 +27,14 @@ typedef struct SCorEpSet { ...@@ -27,13 +27,14 @@ typedef struct SCorEpSet {
SEpSet epSet; SEpSet epSet;
} SCorEpSet; } SCorEpSet;
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp); int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port); void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2); bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
void epsetAssign(SEpSet* dst, const SEpSet* pSrc);
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet); void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
SEpSet getEpSet_s(SCorEpSet* pEpSet); SEpSet getEpSet_s(SCorEpSet* pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1830,7 +1830,7 @@ typedef struct { ...@@ -1830,7 +1830,7 @@ typedef struct {
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg; } SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
typedef struct { typedef struct {
uint64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[256];
SArray* topicNames; // SArray<char**> SArray* topicNames; // SArray<char**>
......
...@@ -58,15 +58,14 @@ struct tmq_list_t { ...@@ -58,15 +58,14 @@ struct tmq_list_t {
}; };
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[256];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
int8_t withTbName; int8_t withTbName;
int8_t snapEnable; int8_t snapEnable;
int32_t snapBatchSize; int32_t snapBatchSize;
bool hbBgEnable; bool hbBgEnable;
uint16_t port; uint16_t port;
int32_t autoCommitInterval; int32_t autoCommitInterval;
char* ip; char* ip;
...@@ -213,6 +212,7 @@ typedef struct { ...@@ -213,6 +212,7 @@ typedef struct {
typedef struct { typedef struct {
SMqCommitCbParamSet* params; SMqCommitCbParamSet* params;
STqOffset* pOffset; STqOffset* pOffset;
SMqClientVg* pMqVg;
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/ /*char topicName[TSDB_TOPIC_FNAME_LEN];*/
/*int32_t vgId;*/ /*int32_t vgId;*/
} SMqCommitCbParam; } SMqCommitCbParam;
...@@ -440,6 +440,17 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -440,6 +440,17 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
} }
#endif #endif
// there may be race condition. fix it
if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) {
SMqClientVg* pMqVg = pParam->pMqVg;
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pParam->pMqVg->epSet = *pBuf->pEpSet;
}
taosMemoryFree(pParam->pOffset); taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet); taosMemoryFree(pBuf->pEpSet);
...@@ -448,7 +459,6 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -448,7 +459,6 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
* pOffset->version);*/ * pOffset->version);*/
tmqCommitRspCountDown(pParamSet); tmqCommitRspCountDown(pParamSet);
return 0; return 0;
} }
...@@ -498,6 +508,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -498,6 +508,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->pOffset = pOffset; pParam->pOffset = pOffset;
pParam->pMqVg = pVg; // there may be an race condition
// build send info // build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
...@@ -518,7 +529,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -518,7 +529,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey, tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port); pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
// TODO: put into cb // TODO: put into cb, the commit offset should be move to the callback function
pVg->committedOffset = pVg->currentOffset; pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestId = generateRequestId();
......
...@@ -60,6 +60,19 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) { ...@@ -60,6 +60,19 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
return true; return true;
} }
void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
if (pSrc == NULL || pDst == NULL) {
return;
}
pDst->inUse = pSrc->inUse;
pDst->numOfEps = pSrc->numOfEps;
for (int32_t i = 0; i < pSrc->numOfEps; ++i) {
pDst->eps[i].port = pSrc->eps[i].port;
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
}
}
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) { void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
taosCorBeginWrite(&pEpSet->version); taosCorBeginWrite(&pEpSet->version);
pEpSet->epSet = *pNewEpSet; pEpSet->epSet = *pNewEpSet;
......
...@@ -413,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ...@@ -413,7 +413,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
// 2. check epoch, only send ep info when epoches do not match // 2. check epoch, only send ep info when epochs do not match
if (epoch != serverEpoch) { if (epoch != serverEpoch) {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch); mInfo("process ask ep, consumer:%" PRId64 "(epoch %d), server epoch %d", consumerId, epoch, serverEpoch);
......
...@@ -2303,6 +2303,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -2303,6 +2303,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
pCtx->epSet = *pEpSet; pCtx->epSet = *pEpSet;
pCtx->origEpSet = *pEpSet;
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册