diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 4a63cd3bb28cdbb31ad4f2ca7531787fccb7e7d4..ee317d0751c90f4f63669a228ce35f640b5bde1b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -67,7 +67,11 @@ void streamSchedByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - streamTaskInput(pTask, (SStreamQueueItem*)trigger); + if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) { + taosFreeQitem(trigger); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + return; + } streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 63a28e17ed7461d8b0eb95c8eece7f31f0f43710..53e49a6ba5e0be669329c5ff6414c3648ee14fb3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -28,6 +28,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { + taosMemoryFree(streamPath); goto _err; } @@ -58,7 +59,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF return pMeta; _err: - if (pMeta->path) taosMemoryFree(pMeta->path); + taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); @@ -250,6 +251,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + tdbFree(pKey); + tdbFree(pVal); return -1; } tDecoderInit(&decoder, (uint8_t*)pVal, vLen); @@ -257,10 +260,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { tDecoderClear(&decoder); if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { + tdbFree(pKey); + tdbFree(pVal); return -1; } if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + tdbFree(pKey); + tdbFree(pVal); return -1; } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ea590c57e66f615872eecee0f66dce581dca52f5..c0ed135d74343395be1147fa72d012e032cd1f2b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -63,7 +63,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int sprintf(statePath, "%s/%d", path, pTask->taskId); } else { memset(statePath, 0, 300); - strncpy(statePath, path, 300); + tstrncpy(statePath, path, 300); } if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { goto _err;