提交 ceb160c2 编写于 作者: H Haojun Liao

fix(query): fix memory leak.

上级 b444e8db
......@@ -247,6 +247,13 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebInfo;
}
static void freeRebalanceItem(void* param) {
SMqRebInfo* pInfo = param;
taosArrayDestroy(pInfo->lostConsumers);
taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers);
}
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SSdb *pSdb = pMnode->pSdb;
......@@ -262,8 +269,21 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
}
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
if (pRebMsg == NULL) {
mError("failed to create the rebalance msg, size:%d, quit mq timer", sizeof(SMqDoRebalanceMsg));
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY;
}
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
// TODO set cleanfp
if (pRebMsg->rebSubHash == NULL) {
mError("failed to create rebalance hashmap");
rpcFreeCont(pRebMsg);
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
// iterate all consumers, find all modification
while (1) {
......@@ -356,7 +376,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
mDebug("mq rebalance finished, no modification");
mDebug("mq timer finished, no need to re-balance");
mndRebEnd();
}
return 0;
......
......@@ -611,6 +611,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
break;
}
// todo handle the malloc failure
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册