From 5b25920fec087cdda4245d8a2d6994fa756d6f2b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 May 2023 23:19:32 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndConsumer.c | 65 +++++++++++++--------- source/dnode/mnode/impl/src/mndSubscribe.c | 31 ++++++----- 2 files changed, 57 insertions(+), 39 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 16ed158fed..ffb343ef22 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -672,7 +672,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; } else { - /*taosRLockLatch(&pExistedConsumer->lock);*/ int32_t status = atomic_load_32(&pExistedConsumer->status); mInfo("receive subscribe request from existed consumer:0x%" PRIx64 @@ -881,7 +880,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) { + for (int32_t i = 0; i < size; i++) { char *p = taosArrayGetP(pConsumer->rebNewTopics, i); if (strcmp(pTopic, p) == 0) { taosArrayRemove(pConsumer->rebNewTopics, i); @@ -902,9 +901,42 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo if (strcmp(pTopic, p) == 0) { taosArrayRemove(pConsumer->rebRemovedTopics, i); taosMemoryFree(p); + + mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", + pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); + break; + } + } +} + +static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { + int32_t sz = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < sz; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + if (strcmp(pTopic, topic) == 0) { + taosArrayRemove(pConsumer->currentTopics, i); + taosMemoryFree(topic); + + mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", + pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); + break; + } + } +} + +static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) { + bool existing = false; + int32_t size = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < size; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + + if (strcmp(topic, pTopic) == 0) { + existing = true; break; } } + + return existing; } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { @@ -951,24 +983,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); - // not exist in current topic - bool existing = false; - int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics); - for (int32_t i = 0; i < numOfExistedTopics; i++) { - char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - if (strcmp(topic, pNewTopic) == 0) { - existing = true; - } - } - + // check if exist in current topic removeFromNewTopicList(pOldConsumer, pNewTopic); // add to current topic - if (!existing) { + bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); + if (existing) { + taosMemoryFree(pNewTopic); + } else { // added into current topic list taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); - } else { - taosMemoryFree(pNewTopic); } // set status @@ -993,16 +1017,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, removeFromRemoveTopicList(pOldConsumer, removedTopic); // remove from current topic - int32_t i = 0; - int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - for (i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - if (strcmp(removedTopic, topic) == 0) { - taosArrayRemove(pOldConsumer->currentTopics, i); - taosMemoryFree(topic); - break; - } - } + removeFromCurrentTopicList(pOldConsumer, removedTopic); // set status int32_t status = pOldConsumer->status; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 015c497de1..b3cf6c9701 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -213,13 +213,9 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - SMqRebOutputVg outputVg = { - .oldConsumerId = consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); } @@ -584,16 +580,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. while (1) { -// if (rebalanceOnce) { -// break; -// } - pIter = taosHashIterate(pReq->rebSubHash, pIter); if (pIter == NULL) { break; } - // todo handle the malloc failure SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); @@ -601,6 +592,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); + if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL || + rebOutput.rebVgs == NULL) { + taosArrayDestroy(rebOutput.newConsumers); + taosArrayDestroy(rebOutput.removedConsumers); + taosArrayDestroy(rebOutput.modifyConsumers); + taosArrayDestroy(rebOutput.rebVgs); + + terrno = TSDB_CODE_OUT_OF_MEMORY; + mInfo("mq re-balance failed, due to out of memory"); + taosHashCleanup(pReq->rebSubHash); + mndRebEnd(); + return -1; + } + SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); @@ -640,6 +645,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput.pSub = tCloneSubscribeObj(pSub); taosRUnLockLatch(&pSub->lock); + mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); } @@ -661,9 +667,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { taosArrayDestroy(rebOutput.rebVgs); tDeleteSubscribeObj(rebOutput.pSub); taosMemoryFree(rebOutput.pSub); - -// taosSsleep(100); -// rebalanceOnce = true; } // reset flag -- GitLab