diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 5cd0adf0841eecfdf9ba1e4d45ea73864f0cc397..4a263b5bce93b2be57fd45ae09d7e653e9fc0a3b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -96,26 +96,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, pSub->subType = pTopic->subType; pSub->withMeta = pTopic->withMeta; - if (pSub->unassignedVgs->size != 0 || taosHashGetSize(pSub->consumerHash) != 0) { - tDeleteSubscribeObj(pSub); - taosMemoryFree(pSub); - terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; - return NULL; - } - if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { tDeleteSubscribeObj(pSub); taosMemoryFree(pSub); return NULL; } - if (pSub->unassignedVgs->size <= 0 || taosHashGetSize(pSub->consumerHash) != 0) { - tDeleteSubscribeObj(pSub); - taosMemoryFree(pSub); - terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; - return NULL; - } - return pSub; } @@ -229,24 +215,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR int32_t actualRemoved = 0; for (int32_t i = 0; i < removedNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); - if (consumerId <= 0) { - mError("sub:%s, mq rebalance cunsumerId:%" PRId64 " <= 0", sub, consumerId); - continue; - } SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - if (pConsumerEp == NULL) { - mError("sub:%s, mq rebalance ep is null, cunsumberId:%" PRId64, sub, consumerId); - continue; - } if (pConsumerEp) { - if (consumerId != pConsumerEp->consumerId) { - mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " not matched saved:%" PRId64, sub, consumerId, - pConsumerEp->consumerId); - continue; - } - actualRemoved++; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { @@ -305,10 +277,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - if (pConsumerEp->consumerId <= 0) { - mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId); - continue; - } int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); // all old consumers still existing are touched @@ -356,10 +324,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); - if (consumerId <= 0) { - mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, consumerId); - continue; - } SMqConsumerEp newConsumerEp; newConsumerEp.consumerId = consumerId; @@ -379,10 +343,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - if (pConsumerEp->consumerId <= 0) { - mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId); - continue; - } // push until equal minVg while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { @@ -417,10 +377,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); pConsumerEp = (SMqConsumerEp *)pIter; - if (pConsumerEp == NULL || pConsumerEp->consumerId <= 0) { - mError("sub:%s, mq rebalance cunsumberId invalid", sub); - continue; - } if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { break; @@ -446,10 +402,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pHash, pIter); if (pIter == NULL) break; pRebOutput = (SMqRebOutputVg *)pIter; - if (pRebOutput->newConsumerId != 1) { - mError("sub:%s, mq rebalance output consumerId:%" PRId64 " not -1", sub, pRebOutput->newConsumerId); - continue; - } taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); @@ -534,7 +486,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); - if (consumerId <= 0) continue; SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); @@ -557,7 +508,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu consumerNum = taosArrayGetSize(pOutput->removedConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); - if (consumerId <= 0) continue; SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); @@ -631,6 +581,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { taosRLockLatch(&pTopic->lock); rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key); + + if (rebOutput.pSub == NULL) { + mError("mq rebalance %s failed create sub since %s, abort", pRebInfo->key, terrstr()); + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + continue; + } memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); taosRUnLockLatch(&pTopic->lock);