未验证 提交 347e78ec 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #22272 from taosdata/fix/TD-25466

Fix/td 25466
...@@ -537,6 +537,9 @@ static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) { ...@@ -537,6 +537,9 @@ static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
if (task->numWait == 0) { if (task->numWait == 0) {
taosThreadCondDestroy(task->done); taosThreadCondDestroy(task->done);
if (task->free) {
task->free(task->arg);
}
taosMemoryFree(task); taosMemoryFree(task);
} }
} }
...@@ -546,6 +549,9 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) { ...@@ -546,6 +549,9 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
taosThreadCondBroadcast(task->done); taosThreadCondBroadcast(task->done);
} else { } else {
taosThreadCondDestroy(task->done); taosThreadCondDestroy(task->done);
if (task->free) {
task->free(task->arg);
}
taosMemoryFree(task); taosMemoryFree(task);
} }
} }
...@@ -627,7 +633,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { ...@@ -627,7 +633,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, fs->tsdb, NULL); code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
break; break;
...@@ -774,8 +780,8 @@ static int32_t tsdbFSRunBgTask(void *arg) { ...@@ -774,8 +780,8 @@ static int32_t tsdbFSRunBgTask(void *arg) {
return 0; return 0;
} }
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *),
int64_t *taskid) { void *arg, int64_t *taskid) {
if (fs->stop) { if (fs->stop) {
return 0; // TODO: use a better error code return 0; // TODO: use a better error code
} }
...@@ -798,6 +804,7 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32 ...@@ -798,6 +804,7 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32
task->type = type; task->type = type;
task->run = run; task->run = run;
task->free = free;
task->arg = arg; task->arg = arg;
task->scheduleTime = taosGetTimestampMs(); task->scheduleTime = taosGetTimestampMs();
task->taskid = ++fs->taskid; task->taskid = ++fs->taskid;
...@@ -819,9 +826,10 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32 ...@@ -819,9 +826,10 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32
return 0; return 0;
} }
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid) { int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
int64_t *taskid) {
taosThreadMutexLock(fs->mutex); taosThreadMutexLock(fs->mutex);
int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, arg, taskid); int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid);
taosThreadMutexUnlock(fs->mutex); taosThreadMutexUnlock(fs->mutex);
return code; return code;
} }
......
...@@ -59,7 +59,8 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e ...@@ -59,7 +59,8 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditCommit(STFileSystem *fs);
int32_t tsdbFSEditAbort(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs);
// background task // background task
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid); int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
int64_t *taskid);
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid); int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs); int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSDisableBgTask(STFileSystem *fs);
...@@ -70,6 +71,7 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); ...@@ -70,6 +71,7 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
struct STFSBgTask { struct STFSBgTask {
EFSBgTaskT type; EFSBgTaskT type;
int32_t (*run)(void *arg); int32_t (*run)(void *arg);
void (*free)(void *arg);
void *arg; void *arg;
TdThreadCond done[1]; TdThreadCond done[1];
......
...@@ -125,6 +125,7 @@ _exit: ...@@ -125,6 +125,7 @@ _exit:
typedef struct { typedef struct {
STsdb *tsdb; STsdb *tsdb;
int32_t sync;
int64_t now; int64_t now;
} SRtnArg; } SRtnArg;
...@@ -251,28 +252,33 @@ _exit: ...@@ -251,28 +252,33 @@ _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} }
taosMemoryFree(arg);
return code; return code;
} }
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) { static void tsdbFreeRtnArg(void *arg) {
SRtnArg *rArg = (SRtnArg *)arg;
if (rArg->sync) {
tsem_post(&rArg->tsdb->pVnode->canCommit);
}
taosMemoryFree(arg);
}
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;
arg->tsdb = tsdb; arg->tsdb = tsdb;
arg->sync = sync;
arg->now = now; arg->now = now;
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, arg, taskid); if (sync) {
if (code) taosMemoryFree(arg); tsem_wait(&tsdb->pVnode->canCommit);
}
return code;
}
int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now) {
int64_t taskid; int64_t taskid;
int32_t code =
int32_t code = tsdbAsyncRetention(tsdb, now, &taskid); tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid);
if (code) return code; if (code) {
tsdbFreeRtnArg(arg);
return tsdbFSWaitBgTask(tsdb->pFS, taskid); }
return code;
} }
\ No newline at end of file
...@@ -15,27 +15,8 @@ ...@@ -15,27 +15,8 @@
#include "vnd.h" #include "vnd.h"
extern int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now); extern int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync);
extern int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid);
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) { int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
int32_t code; return tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
int32_t lino;
if (pVnode->config.sttTrigger == 1) {
tsem_wait(&pVnode->canCommit);
code = tsdbSyncRetention(pVnode->pTsdb, now);
TSDB_CHECK_CODE(code, lino, _exit);
// code = smaDoRetention(pVnode->pSma, now);
// TSDB_CHECK_CODE(code, lino, _exit);
tsem_post(&pVnode->canCommit);
} else {
int64_t taskid;
code = tsdbAsyncRetention(pVnode->pTsdb, now, &taskid);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
return code;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册