提交 224d87b1 编写于 作者: wmmhello's avatar wmmhello

fix:cosume null if rebalance

上级 4321012a
...@@ -1363,7 +1363,6 @@ CREATE_MSG_FAIL: ...@@ -1363,7 +1363,6 @@ CREATE_MSG_FAIL:
typedef struct SVgroupSaveInfo { typedef struct SVgroupSaveInfo {
STqOffsetVal offset; STqOffsetVal offset;
int64_t numOfRows; int64_t numOfRows;
int32_t vgStatus;
} SVgroupSaveInfo; } SVgroupSaveInfo;
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
...@@ -1399,7 +1398,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic ...@@ -1399,7 +1398,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.currentOffset = offsetNew, .currentOffset = offsetNew,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet, .epSet = pVgEp->epSet,
.vgStatus = pInfo != NULL ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgStatus = TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0, .vgSkipCnt = 0,
.emptyBlockReceiveTs = 0, .emptyBlockReceiveTs = 0,
.numOfRows = numOfRows, .numOfRows = numOfRows,
...@@ -1458,7 +1457,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1458,7 +1457,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf); vgKey, buf);
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
} }
} }
......
...@@ -368,6 +368,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -368,6 +368,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
// 3. update the epoch value
taosWLockLatch(&pTq->lock);
int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
reqEpoch);
pHandle->epoch = reqEpoch;
}
taosWUnLockLatch(&pTq->lock);
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册