提交 eab26d39 编写于 作者: H Hongze Cheng

match with old file format

上级 22322035
...@@ -177,6 +177,8 @@ int tsdbEncodeSDFile(void** buf, SDFile* pDFile); ...@@ -177,6 +177,8 @@ int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader); int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
int tsdbUpdateDFileHeader(SDFile* pDFile); int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; } static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; }
......
...@@ -21,7 +21,7 @@ static const char *tsdbTxnFname[] = {"current.t", "current"}; ...@@ -21,7 +21,7 @@ static const char *tsdbTxnFname[] = {"current.t", "current"};
static int tsdbComparFidFSet(const void *arg1, const void *arg2); static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus); static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbApplyFSTxn(STsdbFS *pfs, int vid); static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
...@@ -30,6 +30,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo); ...@@ -30,6 +30,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo);
static int tsdbScanDataDir(STsdbRepo *pRepo); static int tsdbScanDataDir(STsdbRepo *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static int tsdbRestoreCurrent(STsdbRepo *pRepo); static int tsdbRestoreCurrent(STsdbRepo *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2);
// ================== CURRENT file header info // ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
...@@ -291,7 +292,7 @@ int tsdbEndFSTxn(STsdbRepo *pRepo) { ...@@ -291,7 +292,7 @@ int tsdbEndFSTxn(STsdbRepo *pRepo) {
SFSStatus *pStatus; SFSStatus *pStatus;
// Write current file system snapshot // Write current file system snapshot
if (tsdbApplyFSTxn(pfs, REPO_ID(pRepo)) < 0) { if (tsdbSaveFSStatus(pfs->nstatus, REPO_ID(pRepo)) < 0) {
tsdbEndFSTxnWithError(pfs); tsdbEndFSTxnWithError(pfs);
return -1; return -1;
} }
...@@ -321,8 +322,7 @@ void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pf ...@@ -321,8 +322,7 @@ void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pf
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); } int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) { static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
ASSERT(FS_IN_TXN(pfs));
SFSHeader fsheader; SFSHeader fsheader;
void * pBuf = NULL; void * pBuf = NULL;
void * ptr; void * ptr;
...@@ -340,17 +340,17 @@ static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) { ...@@ -340,17 +340,17 @@ static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) {
} }
fsheader.version = TSDB_FS_VERSION; fsheader.version = TSDB_FS_VERSION;
if (pfs->nstatus->pmf == NULL) { if (pStatus->pmf == NULL) {
ASSERT(taosArrayGetSize(pfs->nstatus->df) == 0); ASSERT(taosArrayGetSize(pStatus->df) == 0);
fsheader.len = 0; fsheader.len = 0;
} else { } else {
fsheader.len = tsdbEncodeFSStatus(NULL, pfs->nstatus) + sizeof(TSCKSUM); fsheader.len = tsdbEncodeFSStatus(NULL, pStatus) + sizeof(TSCKSUM);
} }
// Encode header part and write // Encode header part and write
ptr = hbuf; ptr = hbuf;
tsdbEncodeFSHeader(&ptr, &fsheader); tsdbEncodeFSHeader(&ptr, &fsheader);
tsdbEncodeFSMeta(&ptr, &(pfs->nstatus->meta)); tsdbEncodeFSMeta(&ptr, &(pStatus->meta));
taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE); taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
...@@ -370,7 +370,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) { ...@@ -370,7 +370,7 @@ static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) {
} }
ptr = pBuf; ptr = pBuf;
tsdbEncodeFSStatus(&ptr, pfs->nstatus); tsdbEncodeFSStatus(&ptr, pStatus);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len); taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) { if (taosWrite(fd, pBuf, fsheader.len) < fsheader.len) {
...@@ -900,42 +900,290 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { ...@@ -900,42 +900,290 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
return false; return false;
} }
static int tsdbRestoreCurrent(STsdbRepo *pRepo) { static int tsdbRestoreMeta(STsdbRepo *pRepo) {
char rootDir[TSDB_FILENAME_LEN]; char rootDir[TSDB_FILENAME_LEN];
char dataDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL; TDIR * tdir = NULL;
const TFILE *pf = NULL; const TFILE *pf = NULL;
char bname[TSDB_FILENAME_LEN]; const char * pattern = "^meta(-ver[0-9]+)?$";
regex_t regex;
STsdbFS * pfs = REPO_FS(pRepo);
regcomp(&regex, pattern, REG_EXTENDED);
tsdbInfo("vgId:%d try to restore meta", REPO_ID(pRepo));
// Loop to recover mfile
tsdbGetRootDir(REPO_ID(pRepo), rootDir); tsdbGetRootDir(REPO_ID(pRepo), rootDir);
tdir = tfsOpendir(rootDir); tdir = tfsOpendir(rootDir);
if (tdir == NULL) { if (tdir == NULL) {
tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
regfree(&regex);
return -1; return -1;
} }
while ((pf = tfsReaddir(tdir))) { while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname); tfsbasename(pf, bname);
if (strncmp(bname, "meta", sizeof("meta")) == 0) {
// TODO if (strcmp(bname, "data") == 0) {
break; // Skip the data/ directory
continue;
}
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) {
// Skip current.t file
tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
tfsremove(pf);
continue;
} }
int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) {
// Match
if (pfs->cstatus->pmf != NULL) {
tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tfsClosedir(tdir);
regfree(&regex);
return -1;
} else {
uint32_t version = 0;
if (strcmp(bname, "meta") != 0) {
sscanf(bname, "meta-ver%" PRIu32, &version);
pfs->cstatus->meta.version = version;
}
pfs->cstatus->pmf = &(pfs->cstatus->mf);
pfs->cstatus->pmf->f = *pf;
TSDB_FILE_SET_CLOSED(pfs->cstatus->pmf);
if (tsdbOpenMFile(pfs->cstatus->pmf, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno));
tfsClosedir(tdir);
regfree(&regex);
return -1;
}
if (tsdbLoadMFileHeader(pfs->cstatus->pmf, &(pfs->cstatus->pmf->info)) < 0) {
tsdbError("vgId:%d failed to restore meta since %s", REPO_ID(pRepo), tstrerror(terrno));
tfsClosedir(tdir);
regfree(&regex);
return -1;
}
tsdbCloseMFile(pfs->cstatus->pmf);
}
} else if (code == REG_NOMATCH) {
// Not match
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
tfsremove(pf);
continue;
} else {
// Has other error
tsdbError("vgId:%d failed to restore meta file while run regexec since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
tfsClosedir(tdir);
regfree(&regex);
return -1;
}
}
if (pfs->cstatus->pmf) {
tsdbInfo("vgId:%d meta file %s is restored", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pfs->cstatus->pmf));
} else {
tsdbInfo("vgId:%d no meta file is restored", REPO_ID(pRepo));
} }
tfsClosedir(tdir); tfsClosedir(tdir);
regfree(&regex);
return 0;
}
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL;
const TFILE *pf = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$";
SArray * fArray = NULL;
regex_t regex;
STsdbFS * pfs = REPO_FS(pRepo);
// Loop to recover dfile set
tsdbGetDataDir(REPO_ID(pRepo), dataDir); tsdbGetDataDir(REPO_ID(pRepo), dataDir);
// Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED);
fArray = taosArrayInit(1024, sizeof(TFILE));
if (fArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
regfree(&regex);
return -1;
}
tdir = tfsOpendir(dataDir); tdir = tfsOpendir(dataDir);
if (tdir == NULL) { if (tdir == NULL) {
tsdbError("vgId:%d failed to open dir %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
taosArrayDestroy(fArray);
regfree(&regex);
return -1; return -1;
} }
// TODO while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) {
if (taosArrayPush(fArray, (void *)pf) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tfsClosedir(tdir);
taosArrayDestroy(fArray);
regfree(&regex);
return -1;
}
} else if (code == REG_NOMATCH) {
// Not match
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
tfsremove(pf);
continue;
} else {
// Has other error
tsdbError("vgId:%d failed to restore DFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
tfsClosedir(tdir);
taosArrayDestroy(fArray);
regfree(&regex);
return -1;
}
}
tfsClosedir(tdir); tfsClosedir(tdir);
regfree(&regex);
// Sort the array according to file name
taosArraySort(fArray, tsdbComparTFILE);
size_t index = 0;
// Loop to recover each file set
for (;;) {
if (index >= taosArrayGetSize(fArray)) {
break;
}
SDFileSet fset = {0};
TSDB_FSET_SET_CLOSED(&fset);
// Loop to recover ONE fset
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if (index >= taosArrayGetSize(fArray)) {
tsdbError("vgId:%d incomplete DFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
taosArrayDestroy(fArray);
return -1;
}
pf = taosArrayGet(fArray, index);
int tvid, tfid;
TSDB_FILE_T ttype;
uint32_t tversion;
tsdbParseDFilename(TFILE_NAME(pf), &tvid, &tfid, &ttype, &tversion);
ASSERT(tvid == REPO_ID(pRepo));
if (ftype == 0) {
fset.fid = tfid;
} else {
if (tfid != fset.fid) {
tsdbError("vgId:%d incomplete dFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
taosArrayDestroy(fArray);
return -1;
}
}
if (ttype != ftype) {
tsdbError("vgId:%d incomplete dFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
taosArrayDestroy(fArray);
return -1;
}
pDFile->f = *pf;
if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno));
taosArrayDestroy(fArray);
return -1;
}
if (tsdbLoadDFileHeader(pDFile, &(pDFile->info)) < 0) {
tsdbError("vgId:%d failed to load DFile %s header since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
tstrerror(terrno));
taosArrayDestroy(fArray);
return -1;
}
tsdbCloseDFile(pDFile);
}
taosArrayPush(pfs->cstatus->df, &fset);
}
// Resource release
taosArrayDestroy(fArray);
return 0; return 0;
}
static int tsdbRestoreCurrent(STsdbRepo *pRepo) {
// Loop to recover mfile
if (tsdbRestoreMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// Loop to recover dfile set
if (tsdbRestoreDFileSet(pRepo) < 0) {
tsdbError("vgId:%d failed to restore DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
return 0;
}
static int tsdbComparTFILE(const void *arg1, const void *arg2) {
TFILE *pf1 = (TFILE *)arg1;
TFILE *pf2 = (TFILE *)arg2;
int vid1, fid1, vid2, fid2;
TSDB_FILE_T ftype1, ftype2;
uint32_t version1, version2;
tsdbParseDFilename(TFILE_NAME(pf1), &vid1, &fid1, &ftype1, &version1);
tsdbParseDFilename(TFILE_NAME(pf2), &vid2, &fid2, &ftype2, &version2);
if (fid1 < fid2) {
return -1;
} else if (fid1 > fid2) {
return 1;
} else {
if (ftype1 < ftype2) {
return -1;
} else if (ftype1 > ftype2) {
return 1;
} else {
return 0;
}
}
} }
\ No newline at end of file
...@@ -351,6 +351,23 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) { ...@@ -351,6 +351,23 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) {
return 0; return 0;
} }
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
ASSERT(TSDB_FILE_OPENED(pDFile));
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
return -1;
}
if (tsdbReadDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
return -1;
}
tsdbDecodeDFInfo(buf, pInfo);
return 0;
}
static int tsdbScanAndTryFixDFile(SDFile *pDFile) { static int tsdbScanAndTryFixDFile(SDFile *pDFile) {
struct stat dfstat; struct stat dfstat;
SDFile df = *pDFile; SDFile df = *pDFile;
...@@ -558,6 +575,23 @@ int tsdbScanAndTryFixDFileSet(SDFileSet *pSet) { ...@@ -558,6 +575,23 @@ int tsdbScanAndTryFixDFileSet(SDFileSet *pSet) {
return 0; return 0;
} }
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version) {
char *p = NULL;
*version = 0;
*ftype = TSDB_FILE_MAX;
sscanf(fname, "v%df%d.%m[a-z]-ver%" PRIu32, vid, fid, &p, version);
for (TSDB_FILE_T i = 0; i < TSDB_FILE_MAX; i++) {
if (strcmp(p, TSDB_FNAME_SUFFIX[i]) == 0) {
*ftype = i;
break;
}
}
free(p);
return 0;
}
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) { static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) {
ASSERT(ftype != TSDB_FILE_MAX); ASSERT(ftype != TSDB_FILE_MAX);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册