提交 f7ab8dab 编写于 作者: wmmhello's avatar wmmhello

fix:[TD-23972] push subscribe msg to vnode even though consumer not change

上级 238254e4
...@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri ...@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) { const SMqRebOutputVg *pRebVg) {
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return -1; // return -1;
} // }
void *buf; void *buf;
int32_t tlen; int32_t tlen;
...@@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { ...@@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
} }
} }
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = pConsumerEp->consumerId,
.pVgEp = pVgEp,
};
taosArrayPush(pOutput->rebVgs, &outputVg);
}
}
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
int32_t imbConsumerNum) { int32_t imbConsumerNum) {
const char *pSubKey = pOutput->pSub->key; const char *pSubKey = pOutput->pSub->key;
...@@ -318,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas ...@@ -318,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
} }
} }
} }
putNoTransferToOutput(pOutput, pConsumerEp);
} }
} }
......
...@@ -569,20 +569,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -569,20 +569,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.newConsumerId); req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
// atomic_store_32(&pHandle->epoch, 0); // atomic_store_32(&pHandle->epoch, 0);
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
}
// remove if it has been register in the push manager, and return one empty block to consumer
taosWLockLatch(&pTq->lock);
tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
} }
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
}
taosWLockLatch(&pTq->lock);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
}
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
} }
end: end:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册