提交 a1682664 编写于 作者: H Haojun Liao

fix(tmq): disable non-leader vnode responsing the poll request.

上级 52576071
......@@ -24,6 +24,8 @@
#include "tref.h"
#include "ttimer.h"
struct SMqMgmt {
int8_t inited;
tmr_h timer;
......@@ -68,18 +70,16 @@ struct tmq_conf_t {
struct tmq_t {
int64_t refId;
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;
int64_t refId;
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
......@@ -93,11 +93,10 @@ struct tmq_t {
int64_t pollCnt;
// timer
tmr_h hbLiveTimer;
tmr_h epTimer;
tmr_h reportTimer;
tmr_h commitTimer;
tmr_h hbLiveTimer;
tmr_h epTimer;
tmr_h reportTimer;
tmr_h commitTimer;
STscObj* pTscObj; // connection
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of rsp
......@@ -149,6 +148,7 @@ typedef struct {
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
uint64_t reqId;
SEpSet* pEpset;
union {
SMqDataRsp dataRsp;
SMqMetaRsp metaRsp;
......@@ -403,65 +403,42 @@ static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t
return NULL;
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
int32_t numOfVgroups, index;
SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
if (pVg == NULL) {
tscDebug("consumer:0x%" PRIx64
" subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
} else { // let's retry the commit
int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
" retry failed, ignore this commit. code:%s ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
tstrerror(terrno), index + 1, numOfVgroups);
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
return 0;
// todo replace the pTmq with refId
tmq_t* pTmq = pParam->pTmq;
int32_t index = 0, numOfVgroups = 0;
SMqClientVg* pVg = foundClientVg(pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
if (pVg == NULL) {
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d has been transferred to other consumer, ordinal:%d/%d",
pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
} else { // update the epset if needed
if (pBuf->pEpSet != NULL) {
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
tscDebug("consumer:0x%" PRIx64 " subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d, ordinal:%d/%d",
pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port,
index + 1, numOfVgroups);
pVg->epSet = *pBuf->pEpSet;
tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId,
pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups);
// if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
// taosThreadMutexLock(&pParam->pTmq->lock);
// int32_t numOfVgroups, index;
// SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index, &numOfVgroups);
// if (pVg == NULL) {
// tscDebug("consumer:0x%" PRIx64
// " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code), index + 1, numOfVgroups);
// } else { // let's retry the commit
// int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
// if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
// " retry failed, ignore this commit. code:%s ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->committedOffset.version,
// tstrerror(terrno), index + 1, numOfVgroups);
// }
// }
// taosThreadMutexUnlock(&pParam->pTmq->lock);
// taosMemoryFree(pParam->pOffset);
// taosMemoryFree(pBuf->pData);
// taosMemoryFree(pBuf->pEpSet);
// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// return 0;
// }
// // todo replace the pTmq with refId
......@@ -892,15 +869,21 @@ static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
......@@ -1321,7 +1304,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
......@@ -1350,7 +1335,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosWriteQitem(tmq->mqueue, pRspWrapper);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
......@@ -1516,6 +1500,14 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (head->epoch <= epoch) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag);
goto END;
......@@ -1702,13 +1694,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < 100) { // less than 100ms
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
......@@ -1731,6 +1723,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
return 0;
......@@ -1782,31 +1775,43 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) {
// todo fix it: race condition
SMqClientVg* pVg = pollRspWrapper->vgHandle;
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
// update the epset
if (pollRspWrapper->pEpset != NULL) {
SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId,
pVg->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pVg->epSet = *pollRspWrapper->pEpset;
pVg->currentOffset = pDataRsp->rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
char buf[80];
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, reqId:0x%" PRIx64, tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->reqId);
rspWrapper = NULL;
} else { // build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, pollRspWrapper->reqId);
// build rsp
char buf[80];
tFormatOffset(buf, 80, &pVg->currentOffset);
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId,
pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId);
return pRsp;
return pRsp;
} else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmq->consumerId, pDataRsp->head.epoch, consumerEpoch);
......@@ -101,7 +101,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) {
mInfo("rebalance trans end, rebalance counter:%d", newVal);
mDebug("rebalance trans end, rebalance counter:%d", newVal);
......@@ -510,7 +510,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册