diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b315432be1c2deb565af64160e244fe76304d623..51f336c3aa174c0dfd311826d5bd340c319a6b8d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -345,6 +345,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB) #define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC) #define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED) +#define TSDB_CODE_MND_INVALID_SUB_OPTION TAOS_DEF_ERROR_CODE(0, 0x03EE) #define TSDB_CODE_MND_IN_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x03EF) // mnode-stream diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 854be1c52dfd47174921fb3d5790be47ef1b800b..5cd0adf0841eecfdf9ba1e4d45ea73864f0cc397 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -96,8 +96,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, pSub->subType = pTopic->subType; pSub->withMeta = pTopic->withMeta; - ASSERT(pSub->unassignedVgs->size == 0); - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); + if (pSub->unassignedVgs->size != 0 || taosHashGetSize(pSub->consumerHash) != 0) { + tDeleteSubscribeObj(pSub); + taosMemoryFree(pSub); + terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; + return NULL; + } if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { tDeleteSubscribeObj(pSub); @@ -105,8 +109,12 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, return NULL; } - ASSERT(pSub->unassignedVgs->size > 0); - ASSERT(taosHashGetSize(pSub->consumerHash) == 0); + if (pSub->unassignedVgs->size <= 0 || taosHashGetSize(pSub->consumerHash) != 0) { + tDeleteSubscribeObj(pSub); + taosMemoryFree(pSub); + terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; + return NULL; + } return pSub; } @@ -144,7 +152,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { - ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId); + if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; + return -1; + } void *buf; int32_t tlen; @@ -155,8 +166,8 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM int32_t vgId = pRebVg->pVgEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { - ASSERT(0); taosMemoryFree(buf); + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -206,9 +217,9 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { } static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { - int32_t totalVgNum = pOutput->pSub->vgNum; - - mInfo("mq rebalance: subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); + int32_t totalVgNum = pOutput->pSub->vgNum; + const char *sub = pOutput->pSub->key; + mInfo("sub:%s, mq rebalance vgNum:%d", sub, pOutput->pSub->vgNum); // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); @@ -218,11 +229,24 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR int32_t actualRemoved = 0; for (int32_t i = 0; i < removedNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); - ASSERT(consumerId > 0); + if (consumerId <= 0) { + mError("sub:%s, mq rebalance cunsumerId:%" PRId64 " <= 0", sub, consumerId); + continue; + } + SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - ASSERT(pConsumerEp); + if (pConsumerEp == NULL) { + mError("sub:%s, mq rebalance ep is null, cunsumberId:%" PRId64, sub, consumerId); + continue; + } + if (pConsumerEp) { - ASSERT(consumerId == pConsumerEp->consumerId); + if (consumerId != pConsumerEp->consumerId) { + mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " not matched saved:%" PRId64, sub, consumerId, + pConsumerEp->consumerId); + continue; + } + actualRemoved++; int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { @@ -233,7 +257,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64, pVgEp->vgId, consumerId); + mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId); } taosArrayDestroy(pConsumerEp->vgs); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); @@ -241,7 +265,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pOutput->removedConsumers, &consumerId); } } - ASSERT(removedNum == actualRemoved); + + if (removedNum != actualRemoved) { + mError("sub:%s, mq rebalance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved); + } // if previously no consumer, there are vgs not assigned { @@ -254,7 +281,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from unassigned", pVgEp->vgId); + mInfo("sub:%s, mq rebalance remove vgId:%d from unassigned", sub, pVgEp->vgId); } } @@ -268,8 +295,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR minVgCnt = totalVgNum / afterRebConsumerNum; imbConsumerNum = totalVgNum % afterRebConsumerNum; } - mInfo("mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg", afterRebConsumerNum, - minVgCnt, imbConsumerNum); + mInfo("sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg", sub, + afterRebConsumerNum, minVgCnt, imbConsumerNum); // 4. first scan: remove consumer more than wanted, put to remove hash int32_t imbCnt = 0; @@ -278,7 +305,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - ASSERT(pConsumerEp->consumerId > 0); + if (pConsumerEp->consumerId <= 0) { + mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId); + continue; + } + int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); // all old consumers still existing are touched // TODO optimize: touch only consumer whose vgs changed @@ -298,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, + mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId, pConsumerEp->consumerId); } imbCnt++; @@ -313,7 +344,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR .pVgEp = pVgEp, }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, + mInfo("sub:%s, mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId, pConsumerEp->consumerId); } } @@ -325,13 +356,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); - ASSERT(consumerId > 0); + if (consumerId <= 0) { + mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, consumerId); + continue; + } + SMqConsumerEp newConsumerEp; newConsumerEp.consumerId = consumerId; newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("mq rebalance: add new consumer:%" PRId64, consumerId); + mInfo("sub:%s, mq rebalance add new consumer:%" PRId64, sub, consumerId); } } @@ -344,13 +379,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - ASSERT(pConsumerEp->consumerId > 0); + if (pConsumerEp->consumerId <= 0) { + mError("sub:%s, mq rebalance cunsumberId:%" PRId64 " <= 0", sub, pConsumerEp->consumerId); + continue; + } // push until equal minVg while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); - ASSERT(pRemovedIter); + mError("sub:%s, removed iter is null", sub); + continue; + pRebVg = (SMqRebOutputVg *)pRemovedIter; // push taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); @@ -361,7 +401,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } } - ASSERT(pIter == NULL); // 7. handle unassigned vg if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { // if has consumer, assign all left vg @@ -377,9 +416,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - ASSERT(pIter); pConsumerEp = (SMqConsumerEp *)pIter; - ASSERT(pConsumerEp->consumerId > 0); + if (pConsumerEp == NULL || pConsumerEp->consumerId <= 0) { + mError("sub:%s, mq rebalance cunsumberId invalid", sub); + continue; + } + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { break; } @@ -404,19 +446,23 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pHash, pIter); if (pIter == NULL) break; pRebOutput = (SMqRebOutputVg *)pIter; - ASSERT(pRebOutput->newConsumerId == -1); + if (pRebOutput->newConsumerId != 1) { + mError("sub:%s, mq rebalance output consumerId:%" PRId64 " not -1", sub, pRebOutput->newConsumerId); + continue; + } + taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); - mInfo("mq rebalance: unassign vgId:%d (second scan)", pRebOutput->pVgEp->vgId); + mInfo("sub:%s, mq rebalance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId); } } // 8. generate logs - mInfo("mq rebalance: calculation completed, rebalanced vg:"); + mInfo("sub:%s, mq rebalance calculation completed, rebalanced vg", sub); for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); - mInfo("mq rebalance: vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId, - pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); + mInfo("sub:%s, mq rebalance vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, sub, + pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } { void *pIter = NULL; @@ -425,10 +471,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("mq rebalance: final cfg: consumer %" PRId64 " has %d vg", pConsumerEp->consumerId, sz); + mInfo("sub:%s, mq rebalance final cfg: consumer %" PRId64 " has %d vg", sub, pConsumerEp->consumerId, sz); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("mq rebalance: final cfg: vg %d to consumer %" PRId64 "", pVgEp->vgId, pConsumerEp->consumerId); + mInfo("sub:%s, mq rebalance final cfg: vg %d to consumer %" PRId64 "", sub, pVgEp->vgId, + pConsumerEp->consumerId); } } } @@ -487,7 +534,8 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); - ASSERT(consumerId > 0); + if (consumerId <= 0) continue; + SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE__ADD; @@ -497,7 +545,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu taosArrayPush(pConsumerNew->rebNewTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - ASSERT(0); tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); goto REB_FAIL; @@ -510,7 +557,8 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu consumerNum = taosArrayGetSize(pOutput->removedConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); - ASSERT(consumerId > 0); + if (consumerId <= 0) continue; + SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; @@ -520,7 +568,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - ASSERT(0); tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); goto REB_FAIL; @@ -577,7 +624,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { char cgroup[TSDB_CGROUP_LEN]; mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - /*ASSERT(pTopic);*/ if (pTopic == NULL) { mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic); continue; @@ -586,7 +632,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key); memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); - ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0); taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); @@ -606,7 +651,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { // if add more consumer to balanced subscribe, // possibly no vg is changed - /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped"); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6f8b0d8e04b06b39602134fc3f0711fd85edf787..14c3df38a7583f9e38acf936cebce78d7d6f77aa 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -276,8 +276,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SUB_OPTION, "Invalid subscribe option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_IN_REBALANCE, "Topic being rebalanced") // mnode-stream