提交 0841442d 编写于 作者: C Cary Xu

feat: logic optimization for data migration

上级 27799576
......@@ -322,6 +322,7 @@ int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSc
// structs =======================
struct STsdbFS {
int64_t version;
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<SDFileSet>
};
......@@ -575,9 +576,6 @@ struct SDFileSet {
SSttFile *aSttF[TSDB_MAX_STT_TRIGGER];
};
#define SET_DFSET_EXPIRED(d) ((d)->diskId.id = -1)
#define IS_DFSET_EXPIRED(d) ((d)->diskId.id == -1)
struct SRowIter {
TSDBROW *pRow;
STSchema *pTSchema;
......
......@@ -988,21 +988,33 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
bool inTrim = atomic_load_8(&pTsdb->pVnode->trimDbH.state);
STsdbFS fsLatest = {0};
ASSERT(eno == 0);
if (inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock);
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
ASSERT(pCommitter->fs.version <= pTsdb->fs.version);
if (pCommitter->fs.version < pTsdb->fs.version) {
if ((code = tsdbFSCopy(pTsdb, &fsLatest))) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
}
if ((code = tsdbFSUpdDel(pTsdb, &pCommitter->fs, &fsLatest, pTsdb->trimHdl.minCommitFid - 1))) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
}
}
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
if (code) {
if (inTrim) taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
}
// lock
if (!inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock);
// commit or rollback
code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
if (code) {
......
......@@ -419,6 +419,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb) {
int32_t code = 0;
// open handle
pTsdb->fs.version = 0;
pTsdb->fs.pDelFile = NULL;
pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pTsdb->fs.aDFileSet == NULL) {
......@@ -534,6 +535,7 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
int32_t code = 0;
pFS->version = pTsdb->fs.version;
pFS->pDelFile = NULL;
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
if (pFS->aDFileSet == NULL) {
......@@ -746,12 +748,12 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid
if (pSetOld && pSetNew) {
if (pSetOld->fid == pSetNew->fid) {
if (IS_DFSET_EXPIRED(pSetNew)) goto _remove_old;
goto _merge_migrate;
} else if (pSetOld->fid < pSetNew->fid) {
++iOld;
} else if (pSetOld->fid > pSetNew->fid) {
goto _remove_old;
} else {
++iNew;
++iOld;
ASSERT(0);
}
continue;
} else {
......@@ -794,8 +796,8 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid
pSetOld->diskId = pSetNew->diskId;
}
iOld++;
iNew++;
++iOld;
++iNew;
continue;
_remove_old:
......@@ -806,7 +808,7 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid
}
taosMemoryFree(pSetOld->pSmaF);
taosArrayRemove(pFS->aDFileSet, iOld);
iNew++;
++iNew;
continue;
}
......@@ -850,6 +852,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t nRef;
char fname[TSDB_FILENAME_LEN];
++pTsdb->fs.version;
// del
if (pFSNew->pDelFile) {
SDelFile *pDelFile = pTsdb->fs.pDelFile;
......
......@@ -17,7 +17,7 @@
enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 };
#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB
#define MIGRATE_MAX_SPEED (1048576 << 4) // 16 MB, vnode level
#define MIGRATE_MIN_COST (5) // second
static bool tsdbShouldDoMigrate(STsdb *pTsdb);
......@@ -105,8 +105,8 @@ _retention_loop:
code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _exit;
int32_t fsSize = taosArrayGetSize(fs.aDFileSet);
if (type == RETENTION_MIGRATE) {
int32_t fsSize = taosArrayGetSize(fs.aDFileSet);
for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
......@@ -119,26 +119,35 @@ _retention_loop:
maxFid = pSet->fid;
fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size);
if (fSize / speed > MIGRATE_MIN_COST) {
tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
break;
}
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
fSize += pSet->aSttF[iStt]->size;
}
if (fSize / speed > MIGRATE_MIN_COST) {
tsdbDebug("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
break;
}
}
}
} else if (type == RETENTION_EXPIRED) {
for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (expLevel < 0) {
SET_DFSET_EXPIRED(pSet);
ASSERT(pSet->fid > maxFid);
if (pSet->fid > maxFid) maxFid = pSet->fid;
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pSmaF);
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
taosMemoryFree(pSet->aSttF[iStt]);
}
taosArrayRemove(fs.aDFileSet, iSet);
--iSet;
} else {
break;
}
......@@ -167,18 +176,25 @@ _commit_conflict_check:
// migrate
if (type == RETENTION_MIGRATE) {
for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (pSet->fid > maxFid) break;
tsdbDebug("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode),
tsdbDebug("vgId:%d migrate loop %d with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode),
nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel);
if (expLevel < 0) {
SET_DFSET_EXPIRED(pSet);
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pSmaF);
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
taosMemoryFree(pSet->aSttF[iStt]);
}
taosArrayRemove(fs.aDFileSet, iSet);
--iSet;
} else {
if (expLevel == pSet->diskId.level) continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册