From 015c00aaca11002a77f99848ee655301353787da Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 16 Nov 2022 11:16:32 +0800 Subject: [PATCH] fix(stream): timer refer stream task --- source/libs/stream/src/stream.c | 3 +++ source/libs/stream/src/streamMeta.c | 9 +++++++++ source/libs/wal/src/walWrite.c | 12 ++++++++---- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index e6d5859163..79549675a3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -51,6 +51,7 @@ void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + streamMetaReleaseTask(NULL, pTask); return; } @@ -80,6 +81,8 @@ void streamSchedByTimer(void* param, void* tmrId) { int32_t streamSetupTrigger(SStreamTask* pTask) { if (pTask->triggerParam != 0) { + int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + ASSERT(ref == 2); pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ec8828c05..a864814a74 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -80,7 +80,12 @@ void streamMetaClose(SStreamMeta* pMeta) { pIter = taosHashIterate(pMeta->pTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->timer) { + taosTmrStop(pTask->timer); + pTask->timer = NULL; + } tFreeSStreamTask(pTask); + /*streamMetaReleaseTask(pMeta, pTask);*/ } taosHashCleanup(pMeta->pTasks); taosMemoryFree(pMeta->path); @@ -202,6 +207,10 @@ void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) { if (ppTask) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); + /*if (pTask->timer) { + * taosTmrStop(pTask->timer);*/ + /*pTask->timer = NULL;*/ + /*}*/ atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); taosWLockLatch(&pMeta->lock); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 34195f0193..0bc76e4084 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -102,6 +102,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) { taosThreadMutexLock(&pWal->mutex); + wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); int64_t code; char fnameStr[WAL_FILE_LEN]; if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { @@ -123,8 +124,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); for (int i = pWal->writeCur + 1; i < fileSetSize; i++) { walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); + wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); + wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); } // pop from fileInfoSet @@ -157,6 +160,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); + wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { // TODO terrno = TAOS_SYSTEM_ERROR(errno); @@ -324,9 +328,9 @@ int32_t walEndSnapshot(SWal *pWal) { pInfo++; } if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) { - wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); + wDebug("vgId:%d, wal end remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); } else { - wDebug("vgId:%d, no remove", pWal->cfg.vgId); + wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { @@ -343,12 +347,12 @@ int32_t walEndSnapshot(SWal *pWal) { for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); + wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0) { goto UPDATE_META; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); + wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0) { ASSERT(0); } -- GitLab