diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index c2b3a7977c850d07feeed5af9fe71f7006fdc893..9445a0baa5302a7529e8d2d75ca89db6b96d6374 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -36,7 +36,7 @@ typedef struct { int32_t number; } SStreamState; -SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7ffa3cef9992d998f36f34392c0f45d1fdc9d955..46aaa7ffe16c9d684229480ac88a29e0469987b3 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -116,11 +116,13 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - mndTransDrop(pTrans); tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); + mndTransDrop(pTrans); return 0; FAIL: tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); mndTransDrop(pTrans); return -1; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ae259b95be220a3c1cb0d3c2178b92e322d65ace..90ac69423c5f496bebc156b290264e990cacd615 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -154,6 +154,8 @@ TOPIC_ENCODE_OVER: SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; + + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER; @@ -215,17 +217,15 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); if (len) { - void *buf = taosMemoryMalloc(len); + buf = taosMemoryMalloc(len); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; } SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { - taosMemoryFree(buf); goto TOPIC_DECODE_OVER; } - taosMemoryFree(buf); } else { pTopic->schema.nCols = 0; pTopic->schema.version = 0; @@ -251,6 +251,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_SUCCESS; TOPIC_DECODE_OVER: + taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr()); taosMemoryFreeClear(pRow); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 155086a5a8fd771447d465d5a1685b280823924a..722a3f479e47c5e19fa3419fc244accea5282df6 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -321,7 +321,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } taosMemoryFree(s); } - pStreamState = streamStateOpen(taskInfDir, NULL, true); + pStreamState = streamStateOpen(taskInfDir, NULL, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 83cb3955e3c15b6b83ff8f52a5f458a52ffc0faf..59a9f326427a64904bda2c5d86b6619587be2b47 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -890,7 +890,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { // expand executor if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { return -1; } @@ -904,7 +904,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { return -1; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 1c5eee73785841eb23debb0909ebd8fd1b12ac2e..f8343459effd71f02a0fe729fe9182bf211065cd 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -177,6 +177,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { tDecoderClear(&decoder); if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tdbFree(pKey); tdbTbcClose(pCur); return -1; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 85592881cd9b160077fa6d8c8ada783db6ef6cfd..ea590c57e66f615872eecee0f66dce581dca52f5 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -49,7 +49,9 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, return 0; } -SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { + szPage = szPage < 0 ? 4096 : szPage; + pages = pages < 0 ? 256 : pages; SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -63,7 +65,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { memset(statePath, 0, 300); strncpy(statePath, path, 300); } - if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { + if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { goto _err; }