diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 1a0104945fa52c33d06be198501b97510ca481c9..6e7595c6ef29791811b23ce6d4a617ec077b6958 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -537,6 +537,9 @@ static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) { if (task->numWait == 0) { taosThreadCondDestroy(task->done); + if (task->free) { + task->free(task->arg); + } taosMemoryFree(task); } } @@ -546,6 +549,9 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) { taosThreadCondBroadcast(task->done); } else { taosThreadCondDestroy(task->done); + if (task->free) { + task->free(task->arg); + } taosMemoryFree(task); } } @@ -627,7 +633,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; - code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, fs->tsdb, NULL); + code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL); TSDB_CHECK_CODE(code, lino, _exit); break; @@ -774,8 +780,8 @@ static int32_t tsdbFSRunBgTask(void *arg) { return 0; } -static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, - int64_t *taskid) { +static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), + void *arg, int64_t *taskid) { if (fs->stop) { return 0; // TODO: use a better error code } @@ -798,6 +804,7 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32 task->type = type; task->run = run; + task->free = free; task->arg = arg; task->scheduleTime = taosGetTimestampMs(); task->taskid = ++fs->taskid; @@ -819,9 +826,10 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32 return 0; } -int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid) { +int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg, + int64_t *taskid) { taosThreadMutexLock(fs->mutex); - int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, arg, taskid); + int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid); taosThreadMutexUnlock(fs->mutex); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 8270581e58e23468fb1f6e782fc1015c27b2e71c..e814ab2fffb2a49b76fa2642efd309db0ddd7f2b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -59,7 +59,8 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs); // background task -int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid); +int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg, + int64_t *taskid); int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid); int32_t tsdbFSWaitAllBgTask(STFileSystem *fs); int32_t tsdbFSDisableBgTask(STFileSystem *fs); @@ -70,6 +71,7 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); struct STFSBgTask { EFSBgTaskT type; int32_t (*run)(void *arg); + void (*free)(void *arg); void *arg; TdThreadCond done[1]; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index ae5ac4ae3647012c39a20d9bee4cade563899b7a..3af9d2a07a4e1b31cd22d6de82b814b50167e13c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -125,6 +125,7 @@ _exit: typedef struct { STsdb *tsdb; + int32_t sync; int64_t now; } SRtnArg; @@ -251,28 +252,33 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } - taosMemoryFree(arg); return code; } -int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) { +static void tsdbFreeRtnArg(void *arg) { + SRtnArg *rArg = (SRtnArg *)arg; + if (rArg->sync) { + tsem_post(&rArg->tsdb->pVnode->canCommit); + } + taosMemoryFree(arg); +} + +int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY; - arg->tsdb = tsdb; + arg->sync = sync; arg->now = now; - int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, arg, taskid); - if (code) taosMemoryFree(arg); - - return code; -} + if (sync) { + tsem_wait(&tsdb->pVnode->canCommit); + } -int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now) { int64_t taskid; - - int32_t code = tsdbAsyncRetention(tsdb, now, &taskid); - if (code) return code; - - return tsdbFSWaitBgTask(tsdb->pFS, taskid); + int32_t code = + tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid); + if (code) { + tsdbFreeRtnArg(arg); + } + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c index 7af1f8e28f3de849a841effffdca279a2cbcef72..f3344d1d7d0f82cca3872a27d50b5a6ad2680e98 100644 --- a/source/dnode/vnode/src/vnd/vnodeRetention.c +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -15,27 +15,8 @@ #include "vnd.h" -extern int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now); -extern int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid); +extern int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync); int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) { - int32_t code; - int32_t lino; - - if (pVnode->config.sttTrigger == 1) { - tsem_wait(&pVnode->canCommit); - code = tsdbSyncRetention(pVnode->pTsdb, now); - TSDB_CHECK_CODE(code, lino, _exit); - - // code = smaDoRetention(pVnode->pSma, now); - // TSDB_CHECK_CODE(code, lino, _exit); - tsem_post(&pVnode->canCommit); - } else { - int64_t taskid; - code = tsdbAsyncRetention(pVnode->pTsdb, now, &taskid); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - return code; + return tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1); } \ No newline at end of file