提交 e6510847 编写于 作者: C Cary Xu

dfileset compatibility

上级 9c0f0525
......@@ -168,7 +168,6 @@ typedef struct {
uint32_t offset;
uint64_t size;
uint64_t tombSize;
uint32_t fver;
} SDFInfo;
typedef struct {
......@@ -176,6 +175,7 @@ typedef struct {
TFILE f;
int fd;
uint8_t state;
uint32_t fver;
} SDFile;
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
......
......@@ -42,16 +42,16 @@ typedef struct {
int32_t numOfRows : 24;
int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
#ifdef __TD_6117__
int64_t hasAggr : 1;
int64_t blkVer : 7;
int64_t aggrOffset : 56;
int32_t aggrLen;
#endif
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
} SBlock;
typedef struct {
......
......@@ -1166,6 +1166,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) {
return 0;
}
#if 0
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
const TFILE *pf = NULL;
SArray * fArray = NULL;
......@@ -1188,7 +1189,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
TSDB_FSET_SET_INIT(&fset);
// Loop to recover ONE fset
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX - 1; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);
if (index >= taosArrayGetSize(fArray)) {
......@@ -1268,6 +1269,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
}
tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid);
fset.nFiles = 3;
taosArrayPush(pfs->cstatus->df, &fset);
}
......@@ -1276,6 +1278,139 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
return 0;
}
#endif
#if 1
static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
const TFILE *pf = NULL;
SArray * fArray = NULL;
STsdbFS * pfs = REPO_FS(pRepo);
char dataDir[TSDB_FILENAME_LEN] = "\0";
size_t fArraySize = 0;
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
if (tsdbFetchTFileSet(pRepo, &fArray) < 0) {
tsdbError("vgId:%d failed to fetch TFileSet from %s to restore since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
return -1;
}
if ((fArraySize = taosArrayGetSize(fArray)) <= 0) {
tsdbInfo("vgId:%d size of DFileSet from %s is %" PRIu32, REPO_ID(pRepo), dataDir, (uint32_t)fArraySize);
return 0;
}
// Loop to recover each file set
SDFileSet fset = {0};
bool isOneFSetFinish = false;
// one fileset ends when (1) the array ends or (2) encounter different fid
for (size_t index = 0; index < fArraySize; ++index) {
int tvid = -1, tfid = -1;
TSDB_FILE_T ttype = TSDB_FILE_MAX;
uint32_t tversion = -1;
char bname[TSDB_FILENAME_LEN] = "\0";
pf = taosArrayGet(fArray, index);
tfsbasename(pf, bname);
tsdbParseDFilename(bname, &tvid, &tfid, &ttype, &tversion);
ASSERT(tvid == REPO_ID(pRepo));
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ttype);
if (tfid < pRepo->rtn.minFid) { // skip the file expired
continue;
}
if (index == 0) {
memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_INIT(&fset);
fset.nFiles = 1;
fset.fid = tfid;
pDFile->f = *pf;
isOneFSetFinish = false;
} else {
if (fset.fid == tfid) {
++fset.nFiles;
pDFile->f = *pf;
// (1) the array ends
if ((index == fArraySize - 1) && (fset.nFiles >= TSDB_FILE_MIN)) {
tsdbInfo("vgId:%d DFileSet %d is fetched, nFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, fset.nFiles);
isOneFSetFinish = true;
}
} else {
// (2) encounter different fid
if (fset.nFiles >= TSDB_FILE_MIN) {
tsdbInfo("vgId:%d DFileSet %d is fetched, nFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, fset.nFiles);
isOneFSetFinish = true;
} else {
// next FSet
memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_INIT(&fset);
fset.nFiles = 1;
fset.fid = tfid;
pDFile->f = *pf;
isOneFSetFinish = false;
continue;
}
}
}
if (isOneFSetFinish) {
for (TSDB_FILE_T ftype = 0; ftype < fset.nFiles; ++ftype) {
SDFile * pDFile1 = TSDB_DFILE_IN_SET(&fset, ftype);
if (tsdbOpenDFile(pDFile1, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
tstrerror(terrno));
taosArrayDestroy(fArray);
return -1;
}
if (tsdbLoadDFileHeader(pDFile1, &(pDFile1->info)) < 0) {
tsdbError("vgId:%d failed to load DFile %s header since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile1),
tstrerror(terrno));
taosArrayDestroy(fArray);
return -1;
}
if (tsdbForceKeepFile) {
struct stat tfstat;
// Get real file size
if (fstat(pDFile1->fd, &tfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosArrayDestroy(fArray);
return -1;
}
if (pDFile1->info.size != tfstat.st_size) {
int64_t tfsize = pDFile1->info.size;
pDFile1->info.size = tfstat.st_size;
tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile1), tfsize, pDFile1->info.size);
}
}
tsdbCloseDFile(pDFile1);
}
tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid);
taosArrayPush(pfs->cstatus->df, &fset);
// next FSet
memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_INIT(&fset);
fset.nFiles = 1;
fset.fid = tfid;
pDFile->f = *pf;
isOneFSetFinish = false;
}
}
// Resource release
taosArrayDestroy(fArray);
return 0;
}
#endif
static int tsdbRestoreCurrent(STsdbRepo *pRepo) {
// Loop to recover mfile
......
......@@ -302,7 +302,6 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(vid, fid, ver, ftype, fname);
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
......@@ -468,7 +467,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
}
void *pBuf = buf;
// pBuf = taosDecodeFixedU32(pBuf, &_version);
pBuf = taosDecodeFixedU32(pBuf, &(pDFile->fver));
pBuf = tsdbDecodeDFInfo(pBuf, pInfo);
return 0;
}
......@@ -528,7 +527,6 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
......@@ -541,7 +539,6 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
}
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
......
......@@ -411,6 +411,7 @@ static int walSMemRowCheck(SWalHead *pHead) {
pWalHead->len = pWalHead->len + lenExpand;
}
ASSERT((sizeof(SWalHead) + pWalHead->len) <= WAL_MAX_SIZE);
memcpy(pHead, pWalHead, sizeof(SWalHead) + pWalHead->len);
tfree(pWalHead);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册