提交 88556595 编写于 作者: H Hongze Cheng

more code

上级 fbc4cc90
......@@ -48,15 +48,23 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
tsem_init(&fs[0]->canEdit, 0, 1);
fs[0]->state = TSDB_FS_STATE_NONE;
fs[0]->neid = 0;
fs[0]->mergeTaskOn = false;
// background task queue
taosThreadMutexInit(fs[0]->mutex, NULL);
fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue;
fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue;
return 0;
static int32_t destroy_fs(STFileSystem **fs) {
if (fs[0] == NULL) return 0;
ASSERT(fs[0]->bgTaskNum == 0);
TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
......@@ -595,25 +603,18 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
code = commit_edit(fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (fs->etype == TSDB_FEDIT_MERGE) {
fs->mergeTaskOn = false;
// check if need to merge
if (fs->tsdb->pVnode->config.sttTrigger > 1 && fs->mergeTaskOn == false) {
// schedule merge
if (fs->tsdb->pVnode->config.sttTrigger != 1) {
STFileSet *fset;
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr);
if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb);
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, fs->tsdb, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
fs->mergeTaskOn = true;
......@@ -707,4 +708,135 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
fsetArr[0] = NULL;
return 0;
const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"};
static int32_t tsdbFSRunBgTask(void *arg) {
STFileSystem *fs = (STFileSystem *)arg;
ASSERT(fs->bgTaskRunning != NULL);
fs->bgTaskRunning->launchTime = taosGetTimestampMs();
fs->bgTaskRunning->finishTime = taosGetTimestampMs();
tsdbDebug("vgId:%d bg task:%s finished, schedule time:%" PRId64 " launch time:%" PRId64 " finish time:%" PRId64,
TD_VID(fs->tsdb->pVnode), gFSBgTaskName[fs->bgTaskRunning->type], fs->bgTaskRunning->scheduleTime,
fs->bgTaskRunning->launchTime, fs->bgTaskRunning->finishTime);
// free last
if (fs->bgTaskRunning->numWait > 0) {
} else {
fs->bgTaskRunning = NULL;
// schedule next
if (fs->bgTaskNum > 0) {
// pop task from head
fs->bgTaskRunning = fs->bgTaskQueue->next;
fs->bgTaskRunning->prev->next = fs->bgTaskRunning->next;
fs->bgTaskRunning->next->prev = fs->bgTaskRunning->prev;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, arg);
return 0;
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg,
int64_t *taskid) {
// check if same task is on
if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) {
return 0;
for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) {
if (task->type == type) {
return 0;
// do schedule task
STFSBgTask *task = taosMemoryCalloc(1, sizeof(STFSBgTask));
if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY;
taosThreadCondInit(task->done, NULL);
task->type = type;
task->run = run;
task->arg = arg;
task->scheduleTime = taosGetTimestampMs();
task->taskid = ++fs->taskid;
if (fs->bgTaskRunning == NULL && fs->bgTaskNum == 0) {
// launch task directly
fs->bgTaskRunning = task;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fs);
} else {
// add to the queue tail
task->next = fs->bgTaskQueue;
task->prev = fs->bgTaskQueue->prev;
task->prev->next = task;
task->next->prev = task;
if (taskid) *taskid = task->taskid;
return 0;
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid) {
int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, arg, taskid);
return code;
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid) {
STFSBgTask *task = NULL;
if (fs->bgTaskRunning && fs->bgTaskRunning->taskid == taskid) {
task = fs->bgTaskRunning;
} else {
for (STFSBgTask *taskt = fs->bgTaskQueue->next; taskt != fs->bgTaskQueue; taskt = taskt->next) {
if (taskt->taskid == taskid) {
task = taskt;
if (task) {
taosThreadCondWait(task->done, fs->mutex);
if (task->numWait == 0) {
return 0;
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) {
while (fs->bgTaskRunning) {
taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex);
return 0;
\ No newline at end of file
......@@ -24,6 +24,7 @@ extern "C" {
/* Exposed Handle */
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask;
typedef TARRAY2(STFileSet *) TFileSetArray;
typedef enum {
......@@ -31,6 +32,12 @@ typedef enum {
} EFEditT;
typedef enum {
} EFSBgTaskT;
/* Exposed APIs */
// open/close
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
......@@ -45,9 +52,30 @@ int64_t tsdbFSAllocEid(STFileSystem *fs);
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
int32_t tsdbFSEditCommit(STFileSystem *fs);
int32_t tsdbFSEditAbort(STFileSystem *fs);
// background task
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid);
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
struct STFSBgTask {
EFSBgTaskT type;
int32_t (*run)(void *arg);
void *arg;
TdThreadCond done[1];
int32_t numWait;
int64_t taskid;
int64_t scheduleTime;
int64_t launchTime;
int64_t finishTime;
struct STFSBgTask *prev;
struct STFSBgTask *next;
/* Exposed Structs */
struct STFileSystem {
STsdb *tsdb;
......@@ -55,9 +83,15 @@ struct STFileSystem {
int32_t state;
int64_t neid;
EFEditT etype;
bool mergeTaskOn;
TFileSetArray fSetArr[1];
TFileSetArray fSetArrTmp[1];
// background task queue
TdThreadMutex mutex[1];
int64_t taskid;
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
#ifdef __cplusplus
......@@ -1074,7 +1074,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
double el = (taosGetTimestampUs() - st) / 1000.0;
"load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles,
sizeInDisk / 1000.0, el, pReader->idStr);
......@@ -16,105 +16,6 @@
#include "tsdb.h"
#include "tsdbFS2.h"
static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (expLevel == pSet->diskId.level) continue;
if (expLevel < 0) {
return true;
} else {
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
return false;
if (did.level == pSet->diskId.level) continue;
return true;
return false;
bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
bool should;
should = tsdbShouldDoRetentionImpl(pTsdb, now);
return should;
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0;
int32_t lino = 0;
STsdbFS fs = {0};
code = tsdbFSCopy(pTsdb, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (expLevel < 0) {
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
taosArrayRemove(fs.aDFileSet, iSet);
} else {
if (expLevel == 0) continue;
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
goto _exit;
if (did.level == pSet->diskId.level) continue;
// copy file to new disk (todo)
SDFileSet fSet = *pSet;
fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertFSet(&fs, &fSet);
TSDB_CHECK_CODE(code, lino, _exit);
// do change fs
code = tsdbFSPrepareCommit(pTsdb, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
return code;
static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); }
int32_t tsdbCommitRetention(STsdb *pTsdb) {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
return 0;
// new ==============
typedef struct {
STsdb *tsdb;
int32_t szPage;
......@@ -128,19 +29,19 @@ typedef struct {
int32_t fsetArrIdx;
STFileSet *fset;
} ctx[1];
} SRTXer;
} SRTNer;
static int32_t tsdbDoRemoveFileObject(SRTXer *rtxer, const STFileObj *fobj) {
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
return TARRAY2_APPEND(rtxer->fopArr, op);
return TARRAY2_APPEND(rtner->fopArr, op);
static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile *to) {
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
int32_t code = 0;
int32_t lino = 0;
......@@ -148,7 +49,7 @@ static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile
TdFilePtr fdFrom = NULL;
TdFilePtr fdTo = NULL;
tsdbTFileName(rtxer->tsdb, to, fname);
tsdbTFileName(rtner->tsdb, to, fname);
fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
if (fdFrom == NULL) code = terrno;
......@@ -158,7 +59,7 @@ static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile
if (fdTo == NULL) code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtxer->szPage));
int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -168,14 +69,14 @@ static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile
if (code) {
TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
return code;
static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const SDiskID *did) {
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
int32_t code = 0;
int32_t lino = 0;
STFileOp op = {0};
......@@ -187,7 +88,7 @@ static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const
.of = fobj->f[0],
code = TARRAY2_APPEND(rtxer->fopArr, op);
code = TARRAY2_APPEND(rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// create new
......@@ -199,7 +100,7 @@ static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const
.type = fobj->f->type,
.did = did[0],
.fid = fobj->f->fid,
.cid = rtxer->cid,
.cid = rtner->cid,
.size = fobj->f->size,
.stt[0] =
......@@ -208,101 +109,105 @@ static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const
code = TARRAY2_APPEND(rtxer->fopArr, op);
code = TARRAY2_APPEND(rtner->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
// do copy the file
code = tsdbDoCopyFile(rtxer, fobj, &op.nf);
code = tsdbDoCopyFile(rtner, fobj, &op.nf);
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
return code;
static int32_t tsdbDoRetentionBegin(STsdb *tsdb, SRTXer *rtxer) {
typedef struct {
STsdb *tsdb;
int64_t now;
} SRtnArg;
static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
int32_t code = 0;
int32_t lino = 0;
// TODO: wait for merge and compact task done
STsdb *tsdb = arg->tsdb;
rtxer->tsdb = tsdb;
rtxer->szPage = tsdb->pVnode->config.tsdbPageSize;
rtxer->now = taosGetTimestampMs();
rtxer->cid = tsdbFSAllocEid(tsdb->pFS);
rtner->tsdb = tsdb;
rtner->szPage = tsdb->pVnode->config.tsdbPageSize;
rtner->now = arg->now;
rtner->cid = tsdbFSAllocEid(tsdb->pFS);
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtxer->fsetArr);
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} else {
tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtxer->tsdb->pVnode), rtxer->cid, __func__);
tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
return code;
static int32_t tsdbDoRetentionEnd(SRTXer *rtxer) {
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
int32_t code = 0;
int32_t lino = 0;
if (TARRAY2_SIZE(rtxer->fopArr) == 0) goto _exit;
if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit;
code = tsdbFSEditBegin(rtxer->tsdb->pFS, rtxer->fopArr, TSDB_FEDIT_MERGE);
code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSEditCommit(rtxer->tsdb->pFS);
code = tsdbFSEditCommit(rtner->tsdb->pFS);
if (code) {
TSDB_CHECK_CODE(code, lino, _exit);
TARRAY2_DESTROY(rtxer->fopArr, NULL);
TARRAY2_DESTROY(rtner->fopArr, NULL);
if (code) {
TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} else {
tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtxer->tsdb->pVnode), rtxer->cid, __func__);
tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
return code;
static int32_t tsdbDoRetention2(STsdb *tsdb) {
static int32_t tsdbDoRetention2(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
SRTXer rtxer[1] = {0};
code = tsdbDoRetentionBegin(tsdb, rtxer);
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
while (rtxer->ctx->fsetArrIdx < TARRAY2_SIZE(rtxer->fsetArr)) {
rtxer->ctx->fset = TARRAY2_GET(rtxer->fsetArr, rtxer->ctx->fsetArrIdx);
while (rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr)) {
rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx);
STFileObj *fobj;
int32_t expLevel = tsdbFidLevel(rtxer->ctx->fset->fid, &rtxer->tsdb->keepCfg, rtxer->now);
int32_t expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now);
if (expLevel < 0) { // remove the file set
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtxer->ctx->fset->farr[ftype], 1); ++ftype) {
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
code = tsdbDoRemoveFileObject(rtxer, fobj);
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
SSttLvl *lvl;
TARRAY2_FOREACH(rtxer->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbDoRemoveFileObject(rtxer, fobj);
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -311,39 +216,62 @@ static int32_t tsdbDoRetention2(STsdb *tsdb) {
} else {
SDiskID did;
if (tfsAllocDisk(rtxer->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
// data
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtxer->ctx->fset->farr[ftype], 1); ++ftype) {
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
if (fobj->f->did.level == did.level) continue;
code = tsdbDoMigrateFileObj(rtxer, fobj, &did);
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
// stt
SSttLvl *lvl;
TARRAY2_FOREACH(rtxer->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (fobj->f->did.level == did.level) continue;
code = tsdbDoMigrateFileObj(rtxer, fobj, &did);
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDoRetentionEnd(rtxer);
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code);
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
return code;
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) {
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;
arg->tsdb = tsdb;
arg->now = now;
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, arg, taskid);
if (code) taosMemoryFree(arg);
return code;
int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now) {
int64_t taskid;
int32_t code = tsdbAsyncRetention(tsdb, now, &taskid);
if (code) return code;
return tsdbFSWaitBgTask(tsdb->pFS, taskid);
\ No newline at end of file
......@@ -528,25 +528,25 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
*maxKey = *minKey + tsTickPerMin[precision] * minutes - 1;
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) {
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec) {
int32_t aFid[3];
TSKEY key;
if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
now = now * 1000;
nowSec = nowSec * 1000;
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
now = now * 1000000l;
nowSec = nowSec * 1000000l;
} else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
now = now * 1000000000l;
nowSec = nowSec * 1000000000l;
} else {
key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
key = nowSec - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision];
aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
key = nowSec - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision];
aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
key = now - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
key = nowSec - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision];
aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
if (fid >= aFid[0]) {
......@@ -15,116 +15,16 @@
#include "vnd.h"
typedef struct {
SVnode *pVnode;
int64_t now;
int64_t commitID;
SVnodeInfo info;
} SRetentionInfo;
extern int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now);
extern bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbCommitRetention(STsdb *pTsdb);
static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
int32_t vnodeSyncRetention(SVnode *pVnode, int64_t now) {
int32_t code;
if (pVnode->config.sttTrigger == 1) {
code = tsdbSyncRetention(pVnode->pTsdb, now);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
code = tsdbSyncRetention(pVnode->pTsdb, now);
return code;
static int32_t vnodeRetentionTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
SRetentionInfo *pInfo = (SRetentionInfo *)param;
SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
// save info
pInfo->info.state.commitID = pInfo->commitID;
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
// do job
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info
// commit sub-job
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
return code;
int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) {
int32_t code = 0;
int32_t lino = 0;
if (!tsdbShouldDoRetention(pVnode->pTsdb, now)) return code;
SRetentionInfo *pInfo = (SRetentionInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
TSDB_CHECK_CODE(code, lino, _exit);
pInfo->pVnode = pVnode;
pInfo->now = now;
code = vnodePrepareRentention(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeRetentionTask, pInfo);
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
return 0;
\ No newline at end of file
......@@ -509,7 +509,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) {
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME ||
pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
......@@ -565,8 +567,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true);
return vnodeGetBatchMeta(pVnode, pMsg);
// return tqProcessPollReq(pVnode->pTq, pMsg);
// return tqProcessPollReq(pVnode->pTq, pMsg);
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
......@@ -609,7 +611,9 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now);
extern int32_t vnodeSyncRetention(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SVTrimDbReq trimReq = {0};
......@@ -622,10 +626,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int3
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
// process
vnodeAsyncRentention(pVnode, trimReq.timestamp);
code = vnodeSyncRetention(pVnode, trimReq.timestamp);
return code;
......@@ -650,7 +651,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
vnodeAsyncRentention(pVnode, ttlReq.timestamp);
vnodeSyncRetention(pVnode, ttlReq.timestamp);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册