提交 4288e993 编写于 作者: C Cary Xu

code optimization

上级 ef2683d3
...@@ -37,7 +37,11 @@ ...@@ -37,7 +37,11 @@
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
#define TSDB_FSET_NFILES_VALID(s) (((s)->nFiles >= TSDB_FILE_MIN) && ((s)->nFiles <= TSDB_FILE_MAX)) #define ASSERT_TSDB_FSET_NFILES_VALID(s) \
do { \
uint8_t nDFiles = tsdbGetNFiles(s); \
ASSERT((nDFiles >= TSDB_FILE_MIN) && (nDFiles <= TSDB_FILE_MAX)); \
} while (0)
typedef enum { typedef enum {
TSDB_FILE_HEAD = 0, TSDB_FILE_HEAD = 0,
TSDB_FILE_DATA, TSDB_FILE_DATA,
...@@ -297,10 +301,28 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { ...@@ -297,10 +301,28 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
typedef struct { typedef struct {
int fid; int fid;
int state; int state;
uint8_t nFiles; uint8_t ver; // fset version
SDFile files[TSDB_FILE_MAX]; SDFile files[TSDB_FILE_MAX];
} SDFileSet; } SDFileSet;
typedef enum {
TSDB_FSET_VER_0, // .head/.data/.last
TSDB_FSET_VER_1, // .head/.data/.last/.smad/.smal
} ETsdbFSetVer;
#define TSDB_LATEST_FSET_VER TSDB_FSET_VER_1
// get nDFiles in SDFileSet
static FORCE_INLINE uint8_t tsdbGetNFiles(SDFileSet* pSet) {
switch (pSet->ver) {
case TSDB_FSET_VER_0:
return TSDB_FILE_MIN;
case TSDB_FSET_VER_1:
default:
return TSDB_FILE_MAX;
}
}
#define TSDB_FSET_FID(s) ((s)->fid) #define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
...@@ -311,20 +333,20 @@ typedef struct { ...@@ -311,20 +333,20 @@ typedef struct {
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \ } \
} while (0); } while (0);
#define TSDB_FSET_SET_CLOSED(s) \ #define TSDB_FSET_SET_CLOSED(s) \
do { \ do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < (s)->nFiles; ftype++) { \ for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < tsdbGetNFiles(s); ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \ TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \ } \
} while (0); } while (0);
#define TSDB_FSET_FSYNC(s) \ #define TSDB_FSET_FSYNC(s) \
do { \ do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < (s)->nFiles; ftype++) { \ for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < tsdbGetNFiles(s); ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \ TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \ } \
} while (0); } while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t nFiles); void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t fsetVer);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver);
...@@ -336,15 +358,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet); ...@@ -336,15 +358,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
ASSERT(pSet->nFiles <= TSDB_FILE_MAX); ASSERT(tsdbGetNFiles(pSet) <= TSDB_FILE_MAX);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype)); tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
} }
} }
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
ASSERT(TSDB_FSET_NFILES_VALID(pSet)); ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) { if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
tsdbCloseDFileSet(pSet); tsdbCloseDFileSet(pSet);
return -1; return -1;
...@@ -354,15 +376,15 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { ...@@ -354,15 +376,15 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
} }
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
ASSERT(TSDB_FSET_NFILES_VALID(pSet)); ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype)); (void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
} }
} }
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) { static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
ASSERT(TSDB_FSET_NFILES_VALID(pSrc)); ASSERT_TSDB_FSET_NFILES_VALID(pSrc);
for (TSDB_FILE_T ftype = 0; ftype < pSrc->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSrc); ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
tsdbRemoveDFileSet(pDest); tsdbRemoveDFileSet(pDest);
return -1; return -1;
......
...@@ -139,7 +139,7 @@ int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { ...@@ -139,7 +139,7 @@ int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
if (did.level > TSDB_FSET_LEVEL(pSet)) { if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level // Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs), TSDB_FILE_MAX); tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs), pSet->ver);
if (tsdbCopyDFileSet(pSet, &nSet) < 0) { if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
...@@ -1562,7 +1562,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1562,7 +1562,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Set and open commit FSET // Set and open commit FSET
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data // Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_MAX); tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(pWSet, true) < 0) { if (tsdbCreateDFileSet(pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
...@@ -1583,8 +1583,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1583,8 +1583,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
pCommith->wSet.fid = fid; pCommith->wSet.fid = fid;
pCommith->wSet.state = 0; pCommith->wSet.state = 0;
pCommith->wSet.nFiles = TSDB_FILE_MAX; pCommith->wSet.ver = TSDB_LATEST_FSET_VER;
// TSDB_FILE_HEAD // TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
...@@ -1685,7 +1685,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid ...@@ -1685,7 +1685,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
} }
// TSDB_FILE_SMAL // TSDB_FILE_SMAL
ASSERT(pWSet->nFiles >= TSDB_FILE_SMAL); ASSERT(tsdbGetNFiles(pWSet) >= TSDB_FILE_SMAL);
SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh)); SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith); SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);
......
...@@ -197,7 +197,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -197,7 +197,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
} }
tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_MAX); FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) { if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph); tsdbCompactFSetEnd(pComph);
......
...@@ -93,7 +93,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { ...@@ -93,7 +93,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
static int tsdbDecodeDFileSetArray(void **originBuf, void *buf, SArray *pArray, SFSHeader *pSFSHeader) { static int tsdbDecodeDFileSetArray(void **originBuf, void *buf, SArray *pArray, SFSHeader *pSFSHeader) {
uint64_t nset; uint64_t nset;
SDFileSet dset; SDFileSet dset;
dset.nFiles = TSDB_FILE_MIN; // default value: .head/.data/.last dset.ver = TSDB_FSET_VER_0; // default value
taosArrayClear(pArray); taosArrayClear(pArray);
...@@ -986,7 +986,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { ...@@ -986,7 +986,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
SDFileSet *pSet; SDFileSet *pSet;
while ((pSet = tsdbFSIterNext(&fsiter))) { while ((pSet = tsdbFSIterNext(&fsiter))) {
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
if (tfsIsSameFile(pf, TSDB_FILE_F(pDFile))) { if (tfsIsSameFile(pf, TSDB_FILE_F(pDFile))) {
return true; return true;
...@@ -1288,7 +1288,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ...@@ -1288,7 +1288,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
} }
tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid); tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid);
fset.nFiles = 3; fset.ver = TSDB_LATEST_FSET_VER;
taosArrayPush(pfs->cstatus->df, &fset); taosArrayPush(pfs->cstatus->df, &fset);
} }
...@@ -1323,6 +1323,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ...@@ -1323,6 +1323,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
// Loop to recover each file set // Loop to recover each file set
SDFileSet fset = {0}; SDFileSet fset = {0};
uint8_t nDFiles = 0;
bool isOneFSetFinish = true; bool isOneFSetFinish = true;
int lastFType = -1; int lastFType = -1;
// one fileset ends when (1) the array ends or (2) encounter different fid // one fileset ends when (1) the array ends or (2) encounter different fid
...@@ -1349,29 +1350,29 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ...@@ -1349,29 +1350,29 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
if (index == 0) { if (index == 0) {
memset(&fset, 0, sizeof(SDFileSet)); memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_INIT(&fset); TSDB_FSET_SET_INIT(&fset);
fset.nFiles = 1; nDFiles = 1;
fset.fid = tfid; fset.fid = tfid;
pDFile->f = *pf; pDFile->f = *pf;
isOneFSetFinish = false; isOneFSetFinish = false;
} else { } else {
if (fset.fid == tfid) { if (fset.fid == tfid) {
++fset.nFiles; ++nDFiles;
pDFile->f = *pf; pDFile->f = *pf;
// (1) the array ends // (1) the array ends
if ((index == fArraySize - 1) && (fset.nFiles >= TSDB_FILE_MIN)) { if ((index == fArraySize - 1) && (nDFiles >= TSDB_FILE_MIN)) {
tsdbInfo("vgId:%d DFileSet %d is fetched, nFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, fset.nFiles); tsdbInfo("vgId:%d DFileSet %d is fetched, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
isOneFSetFinish = true; isOneFSetFinish = true;
} }
} else { } else {
// (2) encounter different fid // (2) encounter different fid
if (fset.nFiles >= TSDB_FILE_MIN) { if (nDFiles >= TSDB_FILE_MIN) {
tsdbInfo("vgId:%d DFileSet %d is fetched, nFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, fset.nFiles); tsdbInfo("vgId:%d DFileSet %d is fetched, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
isOneFSetFinish = true; isOneFSetFinish = true;
} else { } else {
// next FSet // next FSet
memset(&fset, 0, sizeof(SDFileSet)); memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_INIT(&fset); TSDB_FSET_SET_INIT(&fset);
fset.nFiles = 1; nDFiles = 1;
fset.fid = tfid; fset.fid = tfid;
pDFile->f = *pf; pDFile->f = *pf;
isOneFSetFinish = false; isOneFSetFinish = false;
...@@ -1381,7 +1382,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ...@@ -1381,7 +1382,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
} }
if (isOneFSetFinish) { if (isOneFSetFinish) {
for (TSDB_FILE_T ftype = 0; ftype < fset.nFiles; ++ftype) { for (TSDB_FILE_T ftype = 0; ftype < nDFiles; ++ftype) {
SDFile * pDFile1 = TSDB_DFILE_IN_SET(&fset, ftype); SDFile * pDFile1 = TSDB_DFILE_IN_SET(&fset, ftype);
if (tsdbOpenDFile(pDFile1, O_RDONLY) < 0) { if (tsdbOpenDFile(pDFile1, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1), tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
...@@ -1418,12 +1419,20 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ...@@ -1418,12 +1419,20 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
tsdbCloseDFile(pDFile1); tsdbCloseDFile(pDFile1);
} }
tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid); tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid);
// TODO: update the logic when TSDB_FSET_VER definition update.
if (nDFiles == TSDB_FILE_MIN) {
fset.ver = TSDB_FSET_VER_0;
} else {
fset.ver = TSDB_LATEST_FSET_VER;
}
taosArrayPush(pfs->cstatus->df, &fset); taosArrayPush(pfs->cstatus->df, &fset);
// next FSet // next FSet
memset(&fset, 0, sizeof(SDFileSet)); memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_INIT(&fset); TSDB_FSET_SET_INIT(&fset);
fset.nFiles = 1; nDFiles = 1;
fset.fid = tfid; fset.fid = tfid;
pDFile->f = *pf; pDFile->f = *pf;
isOneFSetFinish = false; isOneFSetFinish = false;
...@@ -1512,7 +1521,7 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) { ...@@ -1512,7 +1521,7 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) {
continue; continue;
} }
for (TSDB_FILE_T ftype = 0; ftype < fset.nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(&fset); ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size || if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size ||
......
...@@ -563,22 +563,22 @@ static int tsdbRollBackDFile(SDFile *pDFile) { ...@@ -563,22 +563,22 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
} }
// ============== Operations on SDFileSet // ============== Operations on SDFileSet
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t nFiles) { void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver, uint8_t fsetVer) {
pSet->fid = fid; pSet->fid = fid;
pSet->state = 0; pSet->state = 0;
pSet->nFiles = nFiles; pSet->ver = fsetVer;
for (TSDB_FILE_T ftype = 0; ftype < nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
tsdbInitDFile(pDFile, did, vid, fid, ver, ftype); tsdbInitDFile(pDFile, did, vid, fid, ver, ftype);
} }
} }
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) { void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
ASSERT(TSDB_FSET_NFILES_VALID(pOSet)); ASSERT_TSDB_FSET_NFILES_VALID(pOSet);
pSet->fid = pOSet->fid; pSet->fid = pOSet->fid;
pSet->nFiles = pOSet->nFiles; pSet->ver = pOSet->ver;
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype)); tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
} }
} }
...@@ -587,8 +587,8 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { ...@@ -587,8 +587,8 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid); tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedU8(buf, pSet->nFiles); tlen += taosEncodeFixedU8(buf, pSet->ver);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
} }
...@@ -603,11 +603,11 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, uint32_t sfver) { ...@@ -603,11 +603,11 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, uint32_t sfver) {
pSet->fid = fid; pSet->fid = fid;
if (sfver > TSDB_FS_VER_0) { if (sfver > TSDB_FS_VER_0) {
buf = taosDecodeFixedU8(buf, &(pSet->nFiles)); buf = taosDecodeFixedU8(buf, &(pSet->ver));
} }
ASSERT(TSDB_FSET_NFILES_VALID(pSet)); ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype), sfver); buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype), sfver);
} }
return buf; return buf;
...@@ -617,8 +617,8 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) { ...@@ -617,8 +617,8 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid); tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedU8(buf, pSet->nFiles); tlen += taosEncodeFixedU8(buf, pSet->ver);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
} }
...@@ -629,17 +629,17 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) { ...@@ -629,17 +629,17 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
int32_t fid; int32_t fid;
buf = taosDecodeFixedI32(buf, &(fid)); buf = taosDecodeFixedI32(buf, &(fid));
buf = taosDecodeFixedU8(buf, &(pSet->nFiles)); buf = taosDecodeFixedU8(buf, &(pSet->ver));
pSet->fid = fid; pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
} }
return buf; return buf;
} }
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
ASSERT(from == NULL || TSDB_FSET_NFILES_VALID(from)); uint8_t nDFiles = (from == NULL) ? TSDB_FILE_MAX : tsdbGetNFiles(from);
for (TSDB_FILE_T ftype = 0; ftype < from->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < nDFiles; ftype++) {
SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL; SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL;
SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL; SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL;
if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) { if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) {
...@@ -651,7 +651,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { ...@@ -651,7 +651,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
} }
int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) { int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) { if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
tsdbCloseDFileSet(pSet); tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet); tsdbRemoveDFileSet(pSet);
...@@ -663,7 +663,7 @@ int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) { ...@@ -663,7 +663,7 @@ int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
} }
int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
if (tsdbUpdateDFileHeader(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { if (tsdbUpdateDFileHeader(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1; return -1;
} }
...@@ -672,8 +672,8 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { ...@@ -672,8 +672,8 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
} }
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
ASSERT(TSDB_FSET_NFILES_VALID(pSet)); ASSERT_TSDB_FSET_NFILES_VALID(pSet);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1; return -1;
} }
......
...@@ -466,7 +466,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -466,7 +466,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
return -1; return -1;
} }
tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs), pSynch->pdf->nFiles); tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs), pSynch->pdf->ver);
// Create new FSET // Create new FSET
if (tsdbCreateDFileSet(&fset, false) < 0) { if (tsdbCreateDFileSet(&fset, false) < 0) {
...@@ -474,7 +474,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -474,7 +474,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
return -1; return -1;
} }
for (TSDB_FILE_T ftype = 0; ftype < pSynch->pdf->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSynch->pdf); ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file
SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file
...@@ -550,10 +550,10 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -550,10 +550,10 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
} }
static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
if (pSet1->nFiles != pSet2->nFiles) { if (pSet1->ver != pSet2->ver) {
return false; return false;
} }
for (TSDB_FILE_T ftype = 0; ftype < pSet1->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet1); ftype++) {
SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype); SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype);
SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype); SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype);
...@@ -595,7 +595,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { ...@@ -595,7 +595,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
if (toSend) { if (toSend) {
tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid); tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype); SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype);
if (tsdbOpenDFile(&df, O_RDONLY) < 0) { if (tsdbOpenDFile(&df, O_RDONLY) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册