提交 af423685 编写于 作者: H Haojun Liao

fix(tmq): handle the commit failure.

上级 9e860f02
......@@ -208,6 +208,8 @@ typedef struct {
static int32_t tmqAskEp(tmq_t* tmq, bool async);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups);
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
tmq_conf_t* tmq_conf_new() {
......@@ -378,51 +380,90 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
return container->pData;
}
static void updateVgEpset(tmq_t* pTmq, SMqCommitCbParam* pParam, SEpSet* pEpSet) {
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, int32_t* numOfVgroups) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
*index = -1;
*numOfVgroups = 0;
for(int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopic = taosArrayGet(pTmq->clientTopics, i);
if (strcmp(pTopic->topicName, pParam->topicName) != 0) {
SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
if (strcmp(pTopic->topicName, pName) != 0) {
continue;
}
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for(int32_t j = 0; j < numOfVgs; ++j) {
*numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < (*numOfVgroups); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg->vgId == pParam->vgId) {
SEp* pEp = GET_ACTIVE_EP(pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pClientVg->epSet = *pEpSet;
break;
if (pClientVg->vgId == vgId) {
*index = j;
return pClientVg;
}
}
break;
}
return NULL;
}
// todo retry to send the commit if failed
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
#if 0
if (code == 0) {
taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
} else {
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
}
#endif
// update the epset if needed
if (pBuf->pEpSet != NULL) {
if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
taosThreadMutexLock(&pParam->pTmq->lock);
updateVgEpset(pParam->pTmq, pParam, pBuf->pEpSet);
int32_t numOfVgroups, index;
SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
if (pVg == NULL) {
tscDebug("consumer:0x%" PRIx64
" subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
} else { // let's retry the commit
int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
" retry failed, ignore this commit. code:%s ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
tstrerror(terrno), index + 1, numOfVgroups);
}
}
taosThreadMutexUnlock(&pParam->pTmq->lock);
taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
return 0;
}
// todo replace the pTmq with refId
taosThreadMutexLock(&pParam->pTmq->lock);
tmq_t* pTmq = pParam->pTmq;
int32_t index = 0, numOfVgroups = 0;
SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
if (pVg == NULL) {
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
} else { // update the epset if needed
if (pBuf->pEpSet != NULL) {
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d",
pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port,
index + 1, numOfVgroups);
pVg->epSet = *pBuf->pEpSet;
}
// update the offset value.
pVg->committedOffset = pVg->currentOffset;
}
taosThreadMutexUnlock(&pParam->pTmq->lock);
taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
......@@ -431,7 +472,8 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
return 0;
}
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups) {
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -498,11 +540,9 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopic
};
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d", tmq->consumerId,
pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
// TODO: put into cb, the commit offset should be move to the callback function
pVg->committedOffset = pVg->currentOffset;
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d, ordinal:%d/%d",
tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
pEp->port, index + 1, totalVgroups);
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
......@@ -557,15 +597,19 @@ static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, t
taosThreadMutexLock(&tmq->lock);
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, topic) != 0) continue;
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
if (strcmp(pTopic->topicName, topic) != 0) {
continue;
}
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId != vgId) {
continue;
}
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
if (tmqSendCommitReq(tmq, pVg, pTopic->topicName, pParamSet) < 0) {
if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
goto FAIL;
......@@ -634,7 +678,6 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
taosThreadMutexLock(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
......@@ -644,14 +687,19 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
numOfVgroups);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j);
if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) {
if (tmqSendCommitReq(tmq, &clientVg, pTopic->topicName, pParamSet) < 0) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno),
j + 1, numOfVgroups);
continue;
}
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, clientVg.vgId, clientVg.currentOffset.version, j + 1, numOfVgroups);
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
}
}
}
......@@ -1804,7 +1852,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
}
} else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false;
tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
taosFreeQitem(rspWrapper);
......@@ -1814,8 +1861,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
}
}
tscDebug("consumer:0x%" PRIx64 " handle the rsp completed", tmq->consumerId);
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册