提交 8ceae585 编写于 作者: L Liu Jicong

fix(tmq): fix memory leak

上级 d8743b72
......@@ -135,16 +135,18 @@ FAIL:
}
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) {
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebInfo == NULL) {
pRebInfo = tNewSMqRebSubscribe(key);
if (pRebInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
taosMemoryFree(pRebInfo);
pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
}
return pRebSub;
return pRebInfo;
}
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
......@@ -305,8 +307,10 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
ASSERT(pTopic);
taosRLockLatch(&pTopic->lock);
topicEp.schema.nCols = pTopic->schema.nCols;
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
if (topicEp.schema.nCols) {
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
}
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
......@@ -517,6 +521,7 @@ SUBSCRIBE_OVER:
}
if (pConsumerNew) {
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
}
// TODO: replace with destroy subscribe msg
if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
......
......@@ -502,9 +502,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
......@@ -547,6 +547,16 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.touchedConsumers);
taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub);
}
// reset flag
......
......@@ -72,8 +72,15 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
int32_t schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
void *swBuf = NULL;
int32_t physicalPlanLen = 0;
if (pTopic->physicalPlan) {
physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
}
int32_t schemaLen = 0;
if (pTopic->schema.nCols) {
taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
}
int32_t size =
sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
......@@ -96,18 +103,24 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
if (pTopic->astLen) {
SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
}
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
void *swBuf = taosMemoryMalloc(schemaLen);
if (swBuf == NULL) {
goto TOPIC_ENCODE_OVER;
if (physicalPlanLen) {
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
}
void *aswBuf = swBuf;
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
if (schemaLen) {
swBuf = taosMemoryMalloc(schemaLen);
if (swBuf == NULL) {
goto TOPIC_ENCODE_OVER;
}
void *aswBuf = swBuf;
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
}
SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
......@@ -116,6 +129,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_SUCCESS;
TOPIC_ENCODE_OVER:
if (swBuf) taosMemoryFree(swBuf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -168,29 +182,43 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
if (pTopic->ast == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
if (pTopic->astLen) {
pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
if (pTopic->ast == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
} else {
pTopic->ast = NULL;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
if (pTopic->physicalPlan == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
if (len) {
pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
if (pTopic->physicalPlan == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
} else {
pTopic->physicalPlan = NULL;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
void *buf = taosMemoryMalloc(len);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
goto TOPIC_DECODE_OVER;
if (len) {
void *buf = taosMemoryMalloc(len);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
goto TOPIC_DECODE_OVER;
}
} else {
pTopic->schema.nCols = 0;
pTopic->schema.sver = 0;
pTopic->schema.pSchema = NULL;
}
SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);
......@@ -340,9 +368,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
return -1;
}
} else {
topicObj.ast = strdup("");
topicObj.astLen = 1;
topicObj.physicalPlan = strdup("");
topicObj.ast = NULL;
topicObj.astLen = 0;
topicObj.physicalPlan = NULL;
topicObj.subType = TOPIC_SUB_TYPE__DB;
topicObj.withTbName = 1;
topicObj.withSchema = 1;
......
......@@ -613,6 +613,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset++;
}
taosMemoryFree(pHeadWithCkSum);
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册