diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 0b2b5ec35cab05fbddd1f9dd2e551010b475df50..6acf7fb98a25579fae5b416ea1bf7cd66ddb2eb2 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -247,6 +247,13 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebInfo; } +static void freeRebalanceItem(void* param) { + SMqRebInfo* pInfo = param; + taosArrayDestroy(pInfo->lostConsumers); + taosArrayDestroy(pInfo->newConsumers); + taosArrayDestroy(pInfo->removedConsumers); +} + static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSdb *pSdb = pMnode->pSdb; @@ -262,8 +269,21 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); + if (pRebMsg == NULL) { + mError("failed to create the rebalance msg, size:%d, quit mq timer", sizeof(SMqDoRebalanceMsg)); + mndRebEnd(); + return TSDB_CODE_OUT_OF_MEMORY; + } + pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); - // TODO set cleanfp + if (pRebMsg->rebSubHash == NULL) { + mError("failed to create rebalance hashmap"); + rpcFreeCont(pRebMsg); + mndRebEnd(); + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem); // iterate all consumers, find all modification while (1) { @@ -356,7 +376,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } else { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); - mDebug("mq rebalance finished, no modification"); + mDebug("mq timer finished, no need to re-balance"); mndRebEnd(); } return 0; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b8dbec2f846e7ea82fcf37aee33e4665dcacfdc1..739de68e5f758911bbfc714f90626c53f8278318 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -611,6 +611,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { break; } + // todo handle the malloc failure SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));