未验证 提交 78ff5f75 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #17663 from taosdata/feature/stream

fix: memory leak
...@@ -47,7 +47,7 @@ static int32_t msg_process(TAOS_RES* msg) { ...@@ -47,7 +47,7 @@ static int32_t msg_process(TAOS_RES* msg) {
int32_t precision = taos_result_precision(msg); int32_t precision = taos_result_precision(msg);
rows++; rows++;
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("row content: %s\n", buf); printf("precision: %d, row content: %s\n", precision, buf);
} }
return rows; return rows;
...@@ -70,7 +70,7 @@ static int32_t init_env() { ...@@ -70,7 +70,7 @@ static int32_t init_env() {
taos_free_result(pRes); taos_free_result(pRes);
// create database // create database
pRes = taos_query(pConn, "create database tmqdb"); pRes = taos_query(pConn, "create database tmqdb precision 'ns'");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -1619,7 +1619,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1619,7 +1619,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) { if (rspWrapper == NULL) {
tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId); /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
return NULL; return NULL;
} }
} }
......
...@@ -5988,7 +5988,11 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { ...@@ -5988,7 +5988,11 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (pRsp->withSchema) { if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) return -1; if (pSW == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1; if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) {
taosMemoryFree(pSW);
return -1;
}
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
...@@ -6069,7 +6073,10 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { ...@@ -6069,7 +6073,10 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (pRsp->withSchema) { if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) return -1; if (pSW == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1; if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) {
taosMemoryFree(pSW);
return -1;
}
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
......
...@@ -155,7 +155,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -155,7 +155,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
......
...@@ -433,16 +433,9 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su ...@@ -433,16 +433,9 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
} }
#endif #endif
if (subType == TOPIC_SUB_TYPE__COLUMN) { ASSERT(subType == TOPIC_SUB_TYPE__COLUMN);
pRsp->withSchema = false; pRsp->withSchema = false;
} else {
pRsp->withSchema = true;
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
if (pRsp->blockSchema == NULL) {
// TODO free
return -1;
}
}
return 0; return 0;
} }
......
...@@ -15,14 +15,14 @@ ...@@ -15,14 +15,14 @@
#include "tq.h" #include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) return -1;
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = 0; pRetrieve->useconds = 0;
pRetrieve->precision = TSDB_DEFAULT_PRECISION; pRetrieve->precision = precision;
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
...@@ -95,7 +95,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -95,7 +95,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
break; break;
} }
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++; pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
...@@ -174,7 +174,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -174,7 +174,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
} }
} }
tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++; pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
continue; continue;
...@@ -256,7 +257,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -256,7 +257,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(&block); blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
pRsp->blockNum++; pRsp->blockNum++;
...@@ -291,7 +293,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -291,7 +293,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
pRsp->createTableNum++; pRsp->createTableNum++;
} }
} }
tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
blockDataFreeRes(&block); blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
pRsp->blockNum++; pRsp->blockNum++;
......
...@@ -270,7 +270,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -270,7 +270,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
break; break;
} }
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++; pRsp->blockNum++;
} }
......
...@@ -32,6 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) { ...@@ -32,6 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) {
return pRef; return pRef;
} }
#if 0
void walCloseRef(SWal *pWal, int64_t refId) { void walCloseRef(SWal *pWal, int64_t refId) {
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
if (ppRef == NULL) return; if (ppRef == NULL) return;
...@@ -39,6 +40,7 @@ void walCloseRef(SWal *pWal, int64_t refId) { ...@@ -39,6 +40,7 @@ void walCloseRef(SWal *pWal, int64_t refId) {
taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
taosMemoryFree(pRef); taosMemoryFree(pRef);
} }
#endif
int32_t walRefVer(SWalRef *pRef, int64_t ver) { int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal; SWal *pWal = pRef->pWal;
...@@ -65,10 +67,12 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { ...@@ -65,10 +67,12 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
return 0; return 0;
} }
#if 0
void walUnrefVer(SWalRef *pRef) { void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1; pRef->refId = -1;
pRef->refFile = -1; pRef->refFile = -1;
} }
#endif
SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *walRefCommittedVer(SWal *pWal) {
SWalRef *pRef = walOpenRef(pWal); SWalRef *pRef = walOpenRef(pWal);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
#if 0
static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
int64_t code = 0; int64_t code = 0;
...@@ -47,6 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { ...@@ -47,6 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
} }
return 0; return 0;
} }
#endif
int walInitWriteFile(SWal* pWal) { int walInitWriteFile(SWal* pWal) {
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
...@@ -134,6 +136,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { ...@@ -134,6 +136,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
return fileFirstVer; return fileFirstVer;
} }
#if 0
int walSeekWriteVer(SWal* pWal, int64_t ver) { int walSeekWriteVer(SWal* pWal, int64_t ver) {
int64_t code; int64_t code;
if (ver == pWal->vers.lastVer) { if (ver == pWal->vers.lastVer) {
...@@ -158,3 +161,4 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) { ...@@ -158,3 +161,4 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) {
return 0; return 0;
} }
#endif
...@@ -86,7 +86,7 @@ class TDTestCase: ...@@ -86,7 +86,7 @@ class TDTestCase:
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 10, 'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 15, 'pollDelay': 25,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 1}
...@@ -157,7 +157,7 @@ class TDTestCase: ...@@ -157,7 +157,7 @@ class TDTestCase:
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 10, 'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10, 'pollDelay': 25,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 1}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册