提交 d507678b 编写于 作者: L Liu Jicong

refactor(stream): set state buffer size

上级 b3b0e5a5
...@@ -36,7 +36,7 @@ typedef struct { ...@@ -36,7 +36,7 @@ typedef struct {
int32_t number; int32_t number;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState); void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState);
......
...@@ -116,11 +116,13 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { ...@@ -116,11 +116,13 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
mndTransDrop(pTrans);
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
mndTransDrop(pTrans);
return 0; return 0;
FAIL: FAIL:
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
......
...@@ -154,6 +154,8 @@ TOPIC_ENCODE_OVER: ...@@ -154,6 +154,8 @@ TOPIC_ENCODE_OVER:
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
...@@ -215,17 +217,15 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -215,17 +217,15 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
if (len) { if (len) {
void *buf = taosMemoryMalloc(len); buf = taosMemoryMalloc(len);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
taosMemoryFree(buf);
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
taosMemoryFree(buf);
} else { } else {
pTopic->schema.nCols = 0; pTopic->schema.nCols = 0;
pTopic->schema.version = 0; pTopic->schema.version = 0;
...@@ -251,6 +251,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -251,6 +251,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
TOPIC_DECODE_OVER: TOPIC_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr()); mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
......
...@@ -321,7 +321,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -321,7 +321,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
taosMemoryFree(s); taosMemoryFree(s);
} }
pStreamState = streamStateOpen(taskInfDir, NULL, true); pStreamState = streamStateOpen(taskInfDir, NULL, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -890,7 +890,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -890,7 +890,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor // expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
} }
...@@ -904,7 +904,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -904,7 +904,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) { } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
} }
......
...@@ -177,6 +177,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { ...@@ -177,6 +177,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdbFree(pKey);
tdbTbcClose(pCur); tdbTbcClose(pCur);
return -1; return -1;
} }
......
...@@ -49,7 +49,9 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, ...@@ -49,7 +49,9 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
return 0; return 0;
} }
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
szPage = szPage < 0 ? 4096 : szPage;
pages = pages < 0 ? 256 : pages;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -63,7 +65,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { ...@@ -63,7 +65,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) {
memset(statePath, 0, 300); memset(statePath, 0, 300);
strncpy(statePath, path, 300); strncpy(statePath, path, 300);
} }
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) {
goto _err; goto _err;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册