未验证 提交 bf2d8540 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #17355 from taosdata/feature/stream

fix(stream): memory leak
...@@ -972,6 +972,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -972,6 +972,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) goto FAIL; if (req.topicNames == NULL) goto FAIL;
tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i); char* topic = taosArrayGetP(container, i);
...@@ -1297,7 +1299,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1297,7 +1299,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
pParam->code = code; pParam->code = code;
if (code != 0) { if (code != 0) {
tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async); tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId,
pParam->async, code);
goto END; goto END;
} }
......
...@@ -67,7 +67,11 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -67,7 +67,11 @@ void streamSchedByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
streamTaskInput(pTask, (SStreamQueueItem*)trigger); if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return;
}
streamSchedExec(pTask); streamSchedExec(pTask);
} }
......
...@@ -28,6 +28,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -28,6 +28,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(streamPath, "%s/%s", path, "stream"); sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = strdup(streamPath); pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
...@@ -58,7 +59,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -58,7 +59,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return pMeta; return pMeta;
_err: _err:
if (pMeta->path) taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
...@@ -250,6 +251,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { ...@@ -250,6 +251,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
tdbFree(pKey);
tdbFree(pVal);
return -1; return -1;
} }
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
...@@ -257,10 +260,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { ...@@ -257,10 +260,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
tdbFree(pKey);
tdbFree(pVal);
return -1; return -1;
} }
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey);
tdbFree(pVal);
return -1; return -1;
} }
} }
......
...@@ -63,7 +63,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -63,7 +63,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
sprintf(statePath, "%s/%d", path, pTask->taskId); sprintf(statePath, "%s/%d", path, pTask->taskId);
} else { } else {
memset(statePath, 0, 300); memset(statePath, 0, 300);
strncpy(statePath, path, 300); tstrncpy(statePath, path, 300);
} }
if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) {
goto _err; goto _err;
......
...@@ -302,7 +302,7 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py ...@@ -302,7 +302,7 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqCheckData1.py python3 ./test.py -f 7-tmq/tmqCheckData1.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 #python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
python3 ./test.py -f 7-tmq/tmqShow.py #python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqAlterSchema.py python3 ./test.py -f 7-tmq/tmqAlterSchema.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册