提交 9c0f0525 编写于 作者: C Cary Xu

process upgrade compatibility

上级 da30668b
......@@ -17,12 +17,25 @@
#define _TD_TSDB_FS_H_
/**
* The fileset .head/.data/.last/.sma use the same TSDB_FS_VERSION.
* 0 - original format before 2021.08.25 // TODO update date 2021.08.25 to release version.
* 1 - extract aggregate block data from .data file and save to separate .sma file since 2021.08.25 // TODO update
* date to release version.
* 1. The fileset .head/.data/.last/.sma use the same fver 0 before 2021.09.05.
* 2. .head fver is 1 when extract aggregate block data from .data file and save to separate .sma file since 2021.09.05
* // TODO update date and add release version.
*/
#define TSDB_FS_VERSION 1
typedef enum {
TSDB_FS_VERSION_0,
TSDB_FS_VERSION_1,
} ETsdbFsVersion;
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T ftype) { // for DFile
switch (ftype) {
case TSDB_FILE_HEAD:
return TSDB_FS_VERSION_1;
default:
return TSDB_FS_VERSION_0;
}
}
static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VERSION_1; } // for current
// ================== TSDB global config
extern bool tsdbForceKeepFile;
......
......@@ -37,9 +37,11 @@
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
#define TSDB_FSET_NFILES_VALID(s) (((s)->nFiles >= TSDB_FILE_MIN) && ((s)->nFiles <= TSDB_FILE_MAX))
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_SMA, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
#define TSDB_FILE_MIN 3U // min number of files in one DFileSet
// =============== SMFile
typedef struct {
int64_t size;
......@@ -166,6 +168,7 @@ typedef struct {
uint32_t offset;
uint64_t size;
uint64_t tombSize;
uint32_t fver;
} SDFInfo;
typedef struct {
......@@ -179,8 +182,9 @@ void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver,
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader, TSDB_FILE_T ftype);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbUpdateDFileHeaderEx(SDFile* pDFile, uint32_t fver);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
......@@ -283,32 +287,39 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
typedef struct {
int fid;
int state;
SDFile files[TSDB_FILE_MAX];
int fid;
int state;
uint8_t nFiles;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_SET_CLOSED(s) \
#define TSDB_FSET_SET_INIT(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
#define TSDB_FSET_FSYNC(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \
#define TSDB_FSET_SET_CLOSED(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < (s)->nFiles; ftype++) { \
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
#define TSDB_FSET_FSYNC(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < (s)->nFiles; ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, bool containNFiles);
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
......@@ -317,13 +328,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ASSERT(pSet->nFiles <= TSDB_FILE_MAX);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ASSERT(TSDB_FSET_NFILES_VALID(pSet));
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
tsdbCloseDFileSet(pSet);
return -1;
......@@ -333,13 +346,15 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
}
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ASSERT(TSDB_FSET_NFILES_VALID(pSet));
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ASSERT(TSDB_FSET_NFILES_VALID(pSrc));
for (TSDB_FILE_T ftype = 0; ftype < pSrc->nFiles; ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
tsdbRemoveDFileSet(pDest);
return -1;
......
......@@ -84,8 +84,6 @@ typedef struct {
int64_t sum;
int64_t max;
int64_t min;
// uint8_t type;
// char reserved[15]; // Adjust the size of reserved array whenever adding new field of SAggrBlkCol.
} SAggrBlkCol;
// Code here just for back-ward compatibility
......@@ -122,11 +120,9 @@ struct SReadH {
int cidx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData; // Block info
#ifdef __TD_6117__
SAggrBlkData *pAggrBlkData; // Block info
#endif
SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2];
void * pRBuf; // buffer
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
......@@ -139,7 +135,7 @@ struct SReadH {
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_AGGR_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMA)
#define TSDB_READ_BUF(rh) ((rh)->pRBuf)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
......
......@@ -844,7 +844,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
memset(pCommith, 0, sizeof(*pCommith));
tsdbGetRtnSnap(pRepo, &(pCommith->rtn));
TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith));
TSDB_FSET_SET_INIT(TSDB_COMMIT_WRITE_FSET(pCommith));
// Init read handle
if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
......@@ -1571,7 +1571,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
pCommith->isSmaFileSame = false;
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
} else {
......@@ -1580,11 +1580,12 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
pCommith->wSet.fid = fid;
pCommith->wSet.state = 0;
pCommith->wSet.nFiles = TSDB_FILE_MAX;
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf, true) < 0) {
if (tsdbCreateDFile(pWHeadf, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
......@@ -1598,7 +1599,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
tsdbInitDFileEx(pWDataf, pRDataf);
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
if (tsdbOpenDFile(pWDataf, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
tstrerror(terrno));
......@@ -1618,7 +1619,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFileEx(pWLastf, pRLastf);
pCommith->isLFileSame = true;
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
if (tsdbOpenDFile(pWLastf, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
......@@ -1633,7 +1634,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pWLastf, true) < 0) {
if (tsdbCreateDFile(pWLastf, true, TSDB_FILE_LAST) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
......@@ -1647,21 +1648,41 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
// TSDB_FILE_SMA
ASSERT(pWSet->nFiles >= TSDB_FILE_SMA);
SDFile *pRSmaf = TSDB_READ_AGGR_FILE(&(pCommith->readh));
SDFile *pWSmaf = TSDB_COMMIT_AGGR_FILE(pCommith);
tsdbInitDFileEx(pWSmaf, pRSmaf);
if (tsdbOpenDFile(pWSmaf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
if (access(TSDB_FILE_FULL_NAME(pRSmaf), F_OK) != 0) {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmaf));
tsdbInitDFile(pWSmaf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMA);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pWSmaf, true, TSDB_FILE_SMA) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFileEx(pWSmaf, pRSmaf);
pCommith->isSmaFileSame = true;
if (tsdbOpenDFile(pWSmaf, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
pCommith->isSmaFileSame = true;
}
return 0;
......
......@@ -270,7 +270,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
memset(pComph, 0, sizeof(*pComph));
TSDB_FSET_SET_CLOSED(TSDB_COMPACT_WSET(pComph));
TSDB_FSET_SET_INIT(TSDB_COMPACT_WSET(pComph));
tsdbGetRtnSnap(pRepo, &(pComph->rtn));
tsdbFSIterInit(&(pComph->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
......
......@@ -36,6 +36,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdbRepo *pRepo);
static int tsdbCreateMeta(STsdbRepo *pRepo);
static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray);
// For backward compatibility
// ================== CURRENT file header info
......@@ -89,15 +90,16 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return tlen;
}
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFiles) {
uint64_t nset;
SDFileSet dset;
dset.nFiles = TSDB_FILE_MIN; // default value: .head/.data/.last
taosArrayClear(pArray);
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
buf = tsdbDecodeDFileSet(buf, &dset);
buf = tsdbDecodeDFileSet(buf, &dset, containNFiles);
taosArrayPush(pArray, (void *)(&dset));
}
return buf;
......@@ -114,13 +116,12 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen;
}
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus, bool containNFiles) {
tsdbResetFSStatus(pStatus);
pStatus->pmf = &(pStatus->mf);
buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df, containNFiles);
return buf;
}
......@@ -414,7 +415,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
return -1;
}
fsheader.version = TSDB_FS_VERSION;
fsheader.version = tsdbGetSFSVersion();
if (pStatus->pmf == NULL) {
ASSERT(taosArrayGetSize(pStatus->df) == 0);
fsheader.len = 0;
......@@ -689,7 +690,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
ptr = tsdbDecodeFSHeader(ptr, &fsheader);
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
if (fsheader.version != TSDB_FS_VERSION) {
if (fsheader.version != TSDB_FS_VERSION_0) {
// TODO: handle file version change
}
......@@ -718,7 +719,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
}
ptr = buffer;
ptr = tsdbDecodeFSStatus(ptr, pStatus);
ptr = tsdbDecodeFSStatus(ptr, pStatus, fsheader.version == TSDB_FS_VERSION_0 ? false : true);
} else {
tsdbResetFSStatus(pStatus);
}
......@@ -752,7 +753,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i);
if (tsdbScanAndTryFixDFileSet(pRepo, pSet) < 0) {
tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d failed to fix DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
......@@ -1098,25 +1099,23 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) {
return 0;
}
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL;
const TFILE *pf = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$";
SArray * fArray = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|sma)(-ver[0-9]+)?$";
regex_t regex;
STsdbFS * pfs = REPO_FS(pRepo);
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
// Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED);
fArray = taosArrayInit(1024, sizeof(TFILE));
if (fArray == NULL) {
*fArray = taosArrayInit(1024, sizeof(TFILE));
if (*fArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tsdbError("vgId:%d failed to fetch TFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
regfree(&regex);
return -1;
......@@ -1124,9 +1123,9 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
tdir = tfsOpendir(dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tsdbError("vgId:%d failed to fetch TFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
taosArrayDestroy(fArray);
taosArrayDestroy(*fArray);
regfree(&regex);
return -1;
}
......@@ -1136,10 +1135,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) {
if (taosArrayPush(fArray, (void *)pf) == NULL) {
if (taosArrayPush(*fArray, (void *)pf) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tfsClosedir(tdir);
taosArrayDestroy(fArray);
taosArrayDestroy(*fArray);
regfree(&regex);
return -1;
}
......@@ -1150,10 +1149,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
continue;
} else {
// Has other error
tsdbError("vgId:%d failed to restore DFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code));
tsdbError("vgId:%d failed to fetch TFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
tfsClosedir(tdir);
taosArrayDestroy(fArray);
taosArrayDestroy(*fArray);
regfree(&regex);
return -1;
}
......@@ -1163,7 +1162,19 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
regfree(&regex);
// Sort the array according to file name
taosArraySort(fArray, tsdbComparTFILE);
taosArraySort(*fArray, tsdbComparTFILE);
return 0;
}
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
const TFILE *pf = NULL;
SArray * fArray = NULL;
STsdbFS * pfs = REPO_FS(pRepo);
if (tsdbFetchTFileSet(pRepo, &fArray) < 0) {
tsdbError("vgId:%d failed to fetch TFileSet to restore since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
size_t index = 0;
// Loop to recover each file set
......@@ -1174,7 +1185,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
SDFileSet fset = {0};
TSDB_FSET_SET_CLOSED(&fset);
TSDB_FSET_SET_INIT(&fset);
// Loop to recover ONE fset
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
......@@ -1335,7 +1346,7 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) {
continue;
}
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
for (TSDB_FILE_T ftype = 0; ftype < fset.nFiles; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size ||
......
......@@ -199,7 +199,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
tsdbInitMFileEx(&mf, pMFile);
if (access(TSDB_FILE_FULL_NAME(pMFile), F_OK) != 0) {
tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
tsdbError("vgId:%d meta file %s not exist, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile));
pRepo->state |= TSDB_STATE_BAD_META;
TSDB_FILE_SET_STATE(pMFile, TSDB_FILE_STATE_BAD);
......@@ -302,6 +302,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(vid, fid, ver, ftype, fname);
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
......@@ -350,10 +351,10 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return buf;
}
int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (pDFile->fd < 0) {
if (errno == ENOENT) {
// Try to create directory recursively
......@@ -364,7 +365,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
}
tfree(s);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -381,7 +382,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
pDFile->info.size += TSDB_FILE_HEAD_SIZE;
if (tsdbUpdateDFileHeader(pDFile) < 0) {
if (tsdbUpdateDFileHeaderEx(pDFile, tsdbGetDFSVersion(fType)) < 0) {
tsdbCloseDFile(pDFile);
tsdbRemoveDFile(pDFile);
return -1;
......@@ -390,15 +391,53 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
return 0;
}
// keep the fver in DFileHeader(e.g. during fs check in openFS)
int tsdbUpdateDFileHeader(SDFile *pDFile) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
tsdbDebug("prop:file %s seek to read fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
if (tsdbReadDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
tsdbDebug("prop:file %s read fail, fd is %d", TSDB_FILE_FULL_NAME(pDFile), pDFile->fd);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbDebug("prop:file %s checksum fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
void *ptr = POINTER_SHIFT(buf, sizeof(uint32_t));
tsdbEncodeDFInfo(&ptr, &(pDFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
tsdbDebug("prop:file %s seek to write fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
if (tsdbWriteDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
tsdbDebug("prop:file %s write fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
return 0;
}
// update the fver in DFileHeader
int tsdbUpdateDFileHeaderEx(SDFile *pDFile, uint32_t fver) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
return -1;
}
void *ptr = buf;
taosEncodeFixedU32(&ptr, TSDB_FS_VERSION);
taosEncodeFixedU32(&ptr, fver);
tsdbEncodeDFInfo(&ptr, &(pDFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
......@@ -411,7 +450,7 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) {
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
uint32_t _version;
// uint32_t _version;
ASSERT(TSDB_FILE_OPENED(pDFile));
......@@ -429,7 +468,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
}
void *pBuf = buf;
pBuf = taosDecodeFixedU32(pBuf, &_version);
// pBuf = taosDecodeFixedU32(pBuf, &_version);
pBuf = tsdbDecodeDFInfo(pBuf, pInfo);
return 0;
}
......@@ -441,7 +480,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
tsdbInitDFileEx(&df, pDFile);
if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) {
tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
tsdbError("vgId:%d data file %s not exist, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile));
pRepo->state |= TSDB_STATE_BAD_DATA;
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
......@@ -454,7 +493,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
}
if (pDFile->info.size < dfstat.st_size) {
if (tsdbOpenDFile(&df, O_WRONLY) < 0) {
if (tsdbOpenDFile(&df, O_RDWR) < 0) {
return -1;
}
......@@ -489,6 +528,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
......@@ -501,6 +541,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
}
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
......@@ -560,6 +601,7 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) {
pSet->fid = fid;
pSet->state = 0;
pSet->nFiles = TSDB_FILE_MAX;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
......@@ -568,8 +610,10 @@ void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t v
}
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
ASSERT(TSDB_FSET_NFILES_VALID(pOSet));
pSet->fid = pOSet->fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
pSet->nFiles = pOSet->nFiles;
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
}
}
......@@ -578,6 +622,7 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedU8(buf, pSet->nFiles);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -585,13 +630,17 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
return tlen;
}
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, bool containNFiles) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0;
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (containNFiles) {
buf = taosDecodeFixedU8(buf, &(pSet->nFiles));
}
ASSERT(TSDB_FSET_NFILES_VALID(pSet));
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
return buf;
......@@ -620,7 +669,8 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
}
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ASSERT(from == NULL || TSDB_FSET_NFILES_VALID(from));
for (TSDB_FILE_T ftype = 0; ftype < from->nFiles; ftype++) {
SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL;
SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL;
if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) {
......@@ -633,7 +683,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet);
return -1;
......@@ -644,7 +694,7 @@ int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
}
int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
if (tsdbUpdateDFileHeader(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1;
}
......@@ -653,7 +703,8 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
}
int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ASSERT(TSDB_FSET_NFILES_VALID(pSet));
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1;
}
......
......@@ -33,8 +33,7 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
memset((void *)pReadh, 0, sizeof(*pReadh));
pReadh->pRepo = pRepo;
TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
TSDB_FSET_SET_INIT(TSDB_READ_FSET(pReadh));
pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pReadh->aBlkIdx == NULL) {
......@@ -65,7 +64,7 @@ void tsdbDestroyReadH(SReadH *pReadh) {
pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
#endif
pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
pReadh->pRBuf = taosTZfree(pReadh->pRBuf);
pReadh->pBuf = taosTZfree(pReadh->pBuf);
pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
......@@ -728,7 +727,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return -1;
}
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pRBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
pBlockCol->colId, offset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册