未验证 提交 f9b08e5c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18984 from taosdata/fix/TD-21163-2

fix: mem leak td-21163
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
static int32_t msg_process(TAOS_RES* msg) { static int32_t msg_process(TAOS_RES* msg) {
char buf[1024]; char buf[1024];
...@@ -40,8 +40,8 @@ static int32_t msg_process(TAOS_RES* msg) { ...@@ -40,8 +40,8 @@ static int32_t msg_process(TAOS_RES* msg) {
TAOS_FIELD* fields = taos_fetch_fields(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
//int32_t* length = taos_fetch_lengths(msg); // int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg); int32_t precision = taos_result_precision(msg);
rows++; rows++;
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf); printf("precision: %d, row content: %s\n", precision, buf);
...@@ -62,14 +62,12 @@ static int32_t init_env() { ...@@ -62,14 +62,12 @@ static int32_t init_env() {
pRes = taos_query(pConn, "drop topic topicname"); pRes = taos_query(pConn, "drop topic topicname");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists tmqdb"); pRes = taos_query(pConn, "drop database if exists tmqdb");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
...@@ -77,7 +75,7 @@ static int32_t init_env() { ...@@ -77,7 +75,7 @@ static int32_t init_env() {
pRes = taos_query(pConn, "create database tmqdb precision 'ns'"); pRes = taos_query(pConn, "create database tmqdb precision 'ns'");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
...@@ -87,7 +85,7 @@ static int32_t init_env() { ...@@ -87,7 +85,7 @@ static int32_t init_env() {
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
...@@ -96,28 +94,28 @@ static int32_t init_env() { ...@@ -96,28 +94,28 @@ static int32_t init_env() {
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
...@@ -126,33 +124,37 @@ static int32_t init_env() { ...@@ -126,33 +124,37 @@ static int32_t init_env() {
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
return 0; return 0;
END:
taos_free_result(pRes);
taos_close(pConn);
return -1;
} }
int32_t create_topic() { int32_t create_topic() {
......
...@@ -438,6 +438,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -438,6 +438,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pParam->pOffset); taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/ * pOffset->version);*/
...@@ -724,7 +725,10 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { ...@@ -724,7 +725,10 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
} }
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg && pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
return 0; return 0;
} }
...@@ -869,6 +873,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { ...@@ -869,6 +873,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
taosMemoryFree(pMsg->pEpSet);
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
return 0; return 0;
} }
...@@ -1166,6 +1172,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1166,6 +1172,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0) { if (code != 0) {
tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr()); tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr());
if (pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
...@@ -1365,6 +1373,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1365,6 +1373,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pParam); taosMemoryFree(pParam);
} }
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1; return -1;
} }
...@@ -1416,6 +1425,8 @@ END: ...@@ -1416,6 +1425,8 @@ END:
} else { } else {
taosMemoryFree(pParam); taosMemoryFree(pParam);
} }
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
return code; return code;
} }
......
...@@ -456,6 +456,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -456,6 +456,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
if (head->isHbParam) { if (head->isHbParam) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
......
...@@ -599,6 +599,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { ...@@ -599,6 +599,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) {
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
conn->refId = exh->refId; conn->refId = exh->refId;
if (conn->refId == -1) {
taosMemoryFree(exh);
}
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册