提交 279fe080 编写于 作者: wmmhello's avatar wmmhello

fix:[TD-23972] push subscribe msg to vnode even though consumer not change

上级 44609743
...@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri ...@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) { const SMqRebOutputVg *pRebVg) {
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return -1; // return -1;
} // }
void *buf; void *buf;
int32_t tlen; int32_t tlen;
......
...@@ -539,10 +539,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -539,10 +539,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} else { } else {
if (pHandle->consumerId == req.newConsumerId) { // do nothing if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
} else { } else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId); req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
} }
// kill executing task // kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task; qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
...@@ -551,8 +554,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -551,8 +554,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
atomic_add_fetch_32(&pHandle->epoch, 1);
// remove if it has been register in the push manager, and return one empty block to consumer // remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册