diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index e94cc54d8917a11e5ca5b39c7b9c0f2a148dc937..0ce9850b0d4152431ab769a8596cee62cd09a0a8 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -22,20 +22,24 @@ * // TODO update date and add release version. */ typedef enum { - TSDB_FS_VERSION_0, - TSDB_FS_VERSION_1, -} ETsdbFsVersion; + TSDB_FS_VER_0, + TSDB_FS_VER_1, +} ETsdbFsVer; -static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T ftype) { // for DFile - switch (ftype) { +#define TSDB_FVER_TYPE uint32_t + +static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile + switch (fType) { case TSDB_FILE_HEAD: - return TSDB_FS_VERSION_1; + return TSDB_FS_VER_1; default: - return TSDB_FS_VERSION_0; + return TSDB_FS_VER_0; } } -static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VERSION_1; } // for current +static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VER_1; } // latest version for current + +int tsdbRefactorFS(STsdbRepo *pRepo); // ================== TSDB global config extern bool tsdbForceKeepFile; diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 25016c53375730f3264e0cd92717badc816dcc3e..54d9bbaec21c471e8bcc06df88e4507bf9dc1be8 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -168,6 +168,7 @@ typedef struct { uint32_t offset; uint64_t size; uint64_t tombSize; + uint32_t fver; } SDFInfo; typedef struct { @@ -175,16 +176,15 @@ typedef struct { TFILE f; int fd; uint8_t state; - uint32_t fver; + } SDFile; void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype); void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile); -void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); +void* tsdbDecodeSDFile(void* buf, SDFile* pDFile, uint32_t sfver); int tsdbCreateDFile(SDFile* pDFile, bool updateHeader, TSDB_FILE_T ftype); int tsdbUpdateDFileHeader(SDFile* pDFile); -int tsdbUpdateDFileHeaderEx(SDFile* pDFile, uint32_t fver); int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo); int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version); @@ -319,7 +319,7 @@ typedef struct { void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); -void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, bool containNFiles); +void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver); int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet); void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 613f6490fe3011169f25af207f10dad9985275a3..a06cc0f8710b955b4c4fa9a7a88262836e46bb36 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -35,6 +35,7 @@ typedef struct { TSKEY maxKey; } SBlockIdx; +#if 0 typedef struct { int64_t last : 1; int64_t offset : 63; @@ -46,14 +47,51 @@ typedef struct { int16_t numOfCols; // not including timestamp column TSKEY keyFirst; TSKEY keyLast; -#ifdef __TD_6117__ - int64_t hasAggr : 1; - int64_t blkVer : 7; - int64_t aggrOffset : 56; - int32_t aggrLen; + } SBlock; #endif -} SBlock; + +/** + * int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols + * int16_t numOfCols; // not including timestamp column + */ +#define SBlockFieldsP0 \ + int64_t last : 1; \ + int64_t offset : 63; \ + int32_t algorithm : 8; \ + int32_t numOfRows : 24; \ + int32_t len; \ + int32_t keyLen; \ + int16_t numOfSubBlocks; \ + int16_t numOfCols; \ + TSKEY keyFirst; \ + TSKEY keyLast + +#define SBlockFieldsP1 \ + int64_t hasAggr : 1; \ + int64_t blkVer : 7; \ + int64_t aggrOffset : 56; \ + int32_t aggrLen + +typedef struct { + SBlockFieldsP0; +} SBlockV0; +typedef struct { + SBlockFieldsP0; + SBlockFieldsP1; +} SBlockV1; + +typedef enum { + TSDB_SBLK_VER_0, + TSDB_SBLK_VER_1, +} ESBlockVer; + +#define SBlockVerLatest TSDB_SBLK_VER_1 + +#define SBlockBase SBlockV0 // base SBlock definition +#define SBlock SBlockV1 // latest SBlock definition + +// lastest SBlockInfo definition typedef struct { int32_t delimiter; // For recovery usage int32_t tid; @@ -61,20 +99,40 @@ typedef struct { SBlock blocks[]; } SBlockInfo; +// definition of SBlockInfoV{#verion} +#define SBlockInfoV(version) \ + typedef struct { \ + int32_t delimiter; \ + int32_t tid; \ + uint64_t uid; \ + SBlockV##version blocks[]; \ + } SBlockInfoV##version; + typedef struct { int16_t colId; int32_t len; uint32_t type : 8; uint32_t offset : 24; - // int64_t sum; - // int64_t max; - // int64_t min; - // int16_t maxIndex; - // int16_t minIndex; - // int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; uint8_t offsetH; char padding[1]; -} SBlockCol; +} SBlockColV0; +typedef struct { + int16_t colId; + int32_t len; + uint32_t type : 8; + uint32_t offset : 24; + uint8_t offsetH; + char padding[1]; +} SBlockColV1; + +#define SBlockColBase SBlockColV0 // base SBlockCol definition +#define SBlockCol SBlockColV1 // latest SBlockCol definition typedef struct { int16_t colId; @@ -118,7 +176,7 @@ struct SReadH { STable * pTable; // table to read SBlockIdx * pBlkIdx; // current reading table SBlockIdx int cidx; - SBlockInfo *pBlkInfo; + SBlockInfo * pBlkInfo; // SBlockInfoV# SBlockData *pBlkData; // Block info SAggrBlkData *pAggrBlkData; // Aggregate Block info SDataCols * pDCols[2]; @@ -148,7 +206,7 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); void tsdbCloseAndUnsetFSet(SReadH *pReadh); int tsdbLoadBlockIdx(SReadH *pReadh); int tsdbSetReadTable(SReadH *pReadh, STable *pTable); -int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); +int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, int32_t *extendedLen); int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); @@ -177,4 +235,20 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { return 0; } +FORCE_INLINE int tsdbInitReadHBlkIdx(SReadH *pReadh, STsdbRepo *pRepo) { + ASSERT(pReadh != NULL && pRepo != NULL); + + memset((void *)pReadh, 0, sizeof(*pReadh)); + pReadh->pRepo = pRepo; + TSDB_FSET_SET_INIT(TSDB_READ_FSET(pReadh)); + + pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (pReadh->aBlkIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + #endif /*_TD_TSDB_READ_IMPL_H_*/ diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index ed84c41a88aef20c393f3521cf9918d900f52cf5..41ccaad3163aecc1d481bd7034a3188d7c504745 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -949,7 +949,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { SBlock *pBlock; if (pCommith->readh.pBlkIdx) { - if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { + if (tsdbLoadBlockInfo(&(pCommith->readh), NULL, NULL) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index aed216903b70d067e4e922f23a58824ab9021d17..025e29ae4ea8dedabfc3cb20d59ab6f62921a027 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -360,7 +360,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { tsdbUnRefTable(pTh->pTable); } - pTh->pInfo = taosTZfree(pTh->pInfo); + // pTh->pInfo = taosTZfree(pTh->pInfo); + tfree(pTh->pInfo); } pComph->tbArray = taosArrayDestroy(pComph->tbArray); @@ -386,11 +387,11 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { pTh->bindex = *(pReadH->pBlkIdx); pTh->pBlkIdx = &(pTh->bindex); - if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) { - return -1; - } - - if (tsdbLoadBlockInfo(pReadH, (void *)(pTh->pInfo)) < 0) { + // if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) { + // return -1; + // } + int32_t originLen = 0; + if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) { return -1; } } diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index b99dc9213760c88bb9c278885da5d68a4195dbcb..e749fdbdac407cea6f189937d2572cea0d0b78ef 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -90,7 +90,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { return tlen; } -static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFiles) { +static int tsdbDecodeDFileSetArray(void **originBuf, void *buf, SArray *pArray, SFSHeader *pSFSHeader) { uint64_t nset; SDFileSet dset; dset.nFiles = TSDB_FILE_MIN; // default value: .head/.data/.last @@ -98,11 +98,24 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFil taosArrayClear(pArray); buf = taosDecodeFixedU64(buf, &nset); + + if (pSFSHeader->version == TSDB_FS_VER_0) { + uint32_t extendedSize = pSFSHeader->len + nset * TSDB_FILE_MAX * sizeof(TSDB_FVER_TYPE); + if (taosTSizeof(*originBuf) < extendedSize) { + int ptrDistance = POINTER_DISTANCE(buf, *originBuf); + if (tsdbMakeRoom(originBuf, extendedSize) < 0) { + terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + return -1; + } + buf = POINTER_SHIFT(*originBuf, ptrDistance); + } + } + for (size_t i = 0; i < nset; i++) { - buf = tsdbDecodeDFileSet(buf, &dset, containNFiles); + buf = tsdbDecodeDFileSet(buf, &dset, pSFSHeader->version); taosArrayPush(pArray, (void *)(&dset)); } - return buf; + return TSDB_CODE_SUCCESS; } static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { @@ -116,14 +129,12 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { return tlen; } -static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus, bool containNFiles) { +static int tsdbDecodeFSStatus(void **originBuf, void *buf, SFSStatus *pStatus, SFSHeader *pSFSHeader) { tsdbResetFSStatus(pStatus); pStatus->pmf = &(pStatus->mf); buf = tsdbDecodeSMFile(buf, pStatus->pmf); - buf = tsdbDecodeDFileSetArray(buf, pStatus->df, containNFiles); - - return buf; + return tsdbDecodeDFileSetArray(originBuf, buf, pStatus->df, pSFSHeader); } static SFSStatus *tsdbNewFSStatus(int maxFSet) { @@ -330,6 +341,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) { return -1; } + // add switch to control + if (tsdbRefactorFS(pRepo) < 0) { + tsdbError("vgId:%d failed to refactor FS since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + // Load meta cache if has meta file if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) { tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -690,7 +707,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { ptr = tsdbDecodeFSHeader(ptr, &fsheader); ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta)); - if (fsheader.version != TSDB_FS_VERSION_0) { + if (fsheader.version != TSDB_FS_VER_0) { // TODO: handle file version change } @@ -719,7 +736,9 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { } ptr = buffer; - ptr = tsdbDecodeFSStatus(ptr, pStatus, fsheader.version == TSDB_FS_VERSION_0 ? false : true); + if (tsdbDecodeFSStatus(&buffer, ptr, pStatus, &fsheader) < 0) { + goto _err; + } } else { tsdbResetFSStatus(pStatus); } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 89ab094395e24bef62da71d5bf47bba4970625d5..50d70804a444850fbf8e9935e37c92baf0e54afc 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = { static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); static int tsdbRollBackMFile(SMFile *pMFile); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); -static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); +static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo, TSDB_FVER_TYPE sfver); static int tsdbRollBackDFile(SDFile *pDFile); // ============== SMFile @@ -302,6 +302,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, memset(&(pDFile->info), 0, sizeof(pDFile->info)); pDFile->info.magic = TSDB_FILE_INIT_MAGIC; + pDFile->info.fver = tsdbGetDFSVersion(ftype); tsdbGetFilename(vid, fid, ver, ftype, fname); tfsInitFile(&(pDFile->f), did.level, did.id, fname); @@ -321,8 +322,8 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) { return tlen; } -void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) { - buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); +void *tsdbDecodeSDFile(void *buf, SDFile *pDFile, uint32_t sfver) { + buf = tsdbDecodeDFInfo(buf, &(pDFile->info), sfver); buf = tfsDecodeFile(buf, &(pDFile->f)); TSDB_FILE_SET_CLOSED(pDFile); @@ -341,7 +342,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) { static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { char *aname; - buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); + buf = tsdbDecodeDFInfo(buf, &(pDFile->info), tsdbGetSFSVersion()); buf = taosDecodeString(buf, &aname); strncpy(TSDB_FILE_FULL_NAME(pDFile), aname, TSDB_FILENAME_LEN); TSDB_FILE_SET_CLOSED(pDFile); @@ -353,7 +354,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); - pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755); + pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755); if (pDFile->fd < 0) { if (errno == ENOENT) { // Try to create directory recursively @@ -364,7 +365,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { } tfree(s); - pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755); + pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755); if (pDFile->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -380,8 +381,9 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { } pDFile->info.size += TSDB_FILE_HEAD_SIZE; + pDFile->info.fver = tsdbGetDFSVersion(fType); - if (tsdbUpdateDFileHeaderEx(pDFile, tsdbGetDFSVersion(fType)) < 0) { + if (tsdbUpdateDFileHeader(pDFile) < 0) { tsdbCloseDFile(pDFile); tsdbRemoveDFile(pDFile); return -1; @@ -390,53 +392,14 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { return 0; } -// keep the fver in DFileHeader(e.g. during fs check in openFS) int tsdbUpdateDFileHeader(SDFile *pDFile) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { - tsdbDebug("prop:file %s seek to read fail", TSDB_FILE_FULL_NAME(pDFile)); - return -1; - } - - if (tsdbReadDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) { - tsdbDebug("prop:file %s read fail, fd is %d", TSDB_FILE_FULL_NAME(pDFile), pDFile->fd); - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tsdbDebug("prop:file %s checksum fail", TSDB_FILE_FULL_NAME(pDFile)); - return -1; - } - - void *ptr = POINTER_SHIFT(buf, sizeof(uint32_t)); - tsdbEncodeDFInfo(&ptr, &(pDFile->info)); - - taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); - - if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { - tsdbDebug("prop:file %s seek to write fail", TSDB_FILE_FULL_NAME(pDFile)); - return -1; - } - - if (tsdbWriteDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) { - tsdbDebug("prop:file %s write fail", TSDB_FILE_FULL_NAME(pDFile)); - return -1; - } - - return 0; -} -// update the fver in DFileHeader -int tsdbUpdateDFileHeaderEx(SDFile *pDFile, uint32_t fver) { - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { return -1; } void *ptr = buf; - taosEncodeFixedU32(&ptr, fver); tsdbEncodeDFInfo(&ptr, &(pDFile->info)); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); @@ -467,8 +430,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { } void *pBuf = buf; - pBuf = taosDecodeFixedU32(pBuf, &(pDFile->fver)); - pBuf = tsdbDecodeDFInfo(pBuf, pInfo); + pBuf = tsdbDecodeDFInfo(pBuf, pInfo, TSDB_FS_VER_1); return 0; } @@ -526,7 +488,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { int tlen = 0; - + tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); @@ -538,7 +500,12 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { return tlen; } -static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { +static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo, TSDB_FVER_TYPE sfver) { + if (sfver > TSDB_FS_VER_0) { + buf = taosDecodeFixedU32(buf, &(pInfo->fver)); + } else { + pInfo->fver = TSDB_FS_VER_0; // default value + } buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); @@ -620,25 +587,27 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { tlen += taosEncodeFixedI32(buf, pSet->fid); tlen += taosEncodeFixedU8(buf, pSet->nFiles); - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } return tlen; } -void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, bool containNFiles) { +void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, uint32_t sfver) { int32_t fid; buf = taosDecodeFixedI32(buf, &(fid)); pSet->state = 0; pSet->fid = fid; - if (containNFiles) { + + if (sfver > TSDB_FS_VER_0) { buf = taosDecodeFixedU8(buf, &(pSet->nFiles)); } + ASSERT(TSDB_FSET_NFILES_VALID(pSet)); for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { - buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); + buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype), sfver); } return buf; } @@ -647,7 +616,8 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pSet->fid); - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + tlen += taosEncodeFixedU8(buf, pSet->nFiles); + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } @@ -658,8 +628,9 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) { int32_t fid; buf = taosDecodeFixedI32(buf, &(fid)); + buf = taosDecodeFixedU8(buf, &(pSet->nFiles)); pSet->fid = fid; - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } return buf; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b2e6fe89161d0e9bceaf74a46807f51ec402fb2a..e917a5d4fea3d2b2959eb6f758a1cd6a26dc1543 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -673,7 +673,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea tdInitDataRow(memRowDataBody(row), pSchema); // first load block index info - if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { err = -1; goto out; } @@ -775,7 +775,7 @@ out: static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { ASSERT(pTable->lastRow == NULL); - if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { return -1; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 14c5a04eced0597c26d34abab1e1cfbc3cd93482..d561039ed5cb5a1397bf86b17246ce28799dc03c 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1086,21 +1086,23 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int return 0; // no data blocks in the file belongs to pCheckInfo->pTable } - if (pCheckInfo->compSize < (int32_t)compIndex->len) { - assert(compIndex->len > 0); + assert(compIndex->len > 0); - char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); - if (t == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - code = TSDB_CODE_TDB_OUT_OF_MEMORY; - return code; - } + // if (pCheckInfo->compSize < (int32_t)compIndex->len) { + // assert(compIndex->len > 0); - pCheckInfo->pCompInfo = (SBlockInfo*)t; - pCheckInfo->compSize = compIndex->len; - } + // char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); + // if (t == NULL) { + // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + // code = TSDB_CODE_TDB_OUT_OF_MEMORY; + // return code; + // } + + // pCheckInfo->pCompInfo = (SBlockInfo*)t; + // pCheckInfo->compSize = compIndex->len; + // } - if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) { + if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo), &pCheckInfo->compSize) < 0) { return terrno; } SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 3835c8e11c6dc436927a578f6343ec62716d3dbb..4b521eef46d048b9078c2372aeb293c91f5ec533 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -33,7 +33,8 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { memset((void *)pReadh, 0, sizeof(*pReadh)); pReadh->pRepo = pRepo; - TSDB_FSET_SET_INIT(TSDB_READ_FSET(pReadh)); + + TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh)); pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); if (pReadh->aBlkIdx == NULL) { @@ -199,6 +200,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { return 0; } +#if 0 int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { ASSERT(pReadh->pBlkIdx != NULL); @@ -242,6 +244,124 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { return 0; } +#endif + +static FORCE_INLINE int32_t tsdbGetSBlockVer(int32_t fver) { + switch (fver) { + case TSDB_FS_VER_0: + return TSDB_SBLK_VER_0; + case TSDB_FS_VER_1: + default: + return TSDB_SBLK_VER_1; + } +} + +static FORCE_INLINE size_t tsdbSizeOfSBlock(int32_t sBlkVer) { + switch (sBlkVer) { + case TSDB_SBLK_VER_0: + return sizeof(SBlockV0); + case TSDB_SBLK_VER_1: + return sizeof(SBlockV1); + default: + return sizeof(SBlock); + } +} + +static int tsdbHeadRefactor(SDFile *pHeadf, SBlockInfo *pSrcBlkInfo, uint32_t srcBlkInfoLen, SBlockInfo **pDstBlkInfo, + int32_t *dstBlkInfoLen) { + int sBlkVer = tsdbGetSBlockVer(pHeadf->info.fver); + if (sBlkVer == SBlockVerLatest) { + *pDstBlkInfo = pSrcBlkInfo; + *dstBlkInfoLen = srcBlkInfoLen; + return TSDB_CODE_SUCCESS; + } + uint32_t originBlkSize = tsdbSizeOfSBlock(sBlkVer); + int nBlks = (srcBlkInfoLen - sizeof(SBlockInfo)) / originBlkSize; + + *dstBlkInfoLen = sizeof(SBlockInfo) + nBlks * sizeof(SBlock); + + if (srcBlkInfoLen == *dstBlkInfoLen) { + *pDstBlkInfo = pSrcBlkInfo; + return TSDB_CODE_SUCCESS; + } + + ASSERT(*dstBlkInfoLen >= srcBlkInfoLen); + + if (tsdbMakeRoom((void **)(pDstBlkInfo), *dstBlkInfoLen) < 0) return -1; + memset(*pDstBlkInfo, 0, *dstBlkInfoLen); // the blkVer is set to 0 + memcpy(*pDstBlkInfo, pSrcBlkInfo, sizeof(SBlockInfo)); // copy header + for (int i = 0; i < nBlks; ++i) { + memcpy((*pDstBlkInfo)->blocks + i, POINTER_SHIFT(pSrcBlkInfo->blocks, i * originBlkSize), originBlkSize); + // TODO: update the fields if the SBlock definition changed later + } + taosTZfree(pSrcBlkInfo); + return TSDB_CODE_SUCCESS; +} + +int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, int32_t *extendedLen) { + ASSERT(pReadh->pBlkIdx != NULL); + + SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh); + SBlockIdx * pBlkIdx = pReadh->pBlkIdx; + SBlockInfo *pBlkInfo = NULL; + + if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len); + return -1; + } + + if (tsdbMakeRoom((void **)(&pBlkInfo), pBlkIdx->len) < 0) return -1; + + int64_t nread = tsdbReadDFile(pHeadf, (void *)pBlkInfo, pBlkIdx->len); + if (nread < 0) { + tsdbError("vgId:%d failed to load SBlockInfo part while read file %s since %s, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len); + taosTZfree(pBlkInfo); + return -1; + } + + if (nread < pBlkIdx->len) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread); + taosTZfree(pBlkInfo); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)pBlkInfo, pBlkIdx->len)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len); + taosTZfree(pBlkInfo); + return -1; + } + + ASSERT(pBlkIdx->tid == pBlkInfo->tid && pBlkIdx->uid == pBlkInfo->uid); + + int32_t dstBlkInfoLen = 0; + if (tsdbHeadRefactor(pHeadf, pBlkInfo, pBlkIdx->len, &(pReadh->pBlkInfo), &dstBlkInfoLen) < 0) { + taosTZfree(pBlkInfo); + return -1; + } + + if (extendedLen != NULL) { + if (pTarget != NULL) { + if (*extendedLen < dstBlkInfoLen) { + char *t = realloc(*pTarget, dstBlkInfoLen); + if (t == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + *pTarget = t; + } + memcpy(*pTarget, (void *)(pReadh->pBlkInfo), dstBlkInfoLen); + } + *extendedLen = dstBlkInfoLen; + } + + return TSDB_CODE_SUCCESS; +} int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ASSERT(pBlock->numOfSubBlocks > 0); diff --git a/src/tsdb/src/tsdbRecover.c b/src/tsdb/src/tsdbRecover.c index 6dea4a4e57392be988126c579648f39a8270b9bf..0e575a95a9b9e74d21dfcd12b9d494cc05a8e2e5 100644 --- a/src/tsdb/src/tsdbRecover.c +++ b/src/tsdb/src/tsdbRecover.c @@ -11,4 +11,366 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "tsdbint.h" + +typedef struct { + SReadH readh; + SDFileSet wSet; + SArray * aBlkIdx; // SBlockIdx array + SArray * aSupBlk; // Table super-block array + SArray * aSubBlk; // table sub-block array +} SRecoverH; + +#define TSDB_RECOVER_WFSET(rh) (&((rh)->wSet)) +#define TSDB_RECOVER_WHEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_RECOVER_WFSET(rh), TSDB_FILE_HEAD) +#define TSDB_RECOVER_WDATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_RECOVER_WFSET(rh), TSDB_FILE_DATA) +#define TSDB_RECOVER_WLAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_RECOVER_WFSET(rh), TSDB_FILE_LAST) + +static int tsdbInitRecoverH(SRecoverH *pRecoverH, STsdbRepo *pRepo); +static int tsdbDestoryRecoverH(SRecoverH *pRecoverH); +static int tsdbInitHFile(STsdbRepo *pRepo, SDFile *pDestDFile, const SDFile *pSrcDFile, int fid); +static int tsdbDestroyHFile(SDFile *pDFile); +static int tsdbHeadWriteBlockInfo(SRecoverH *pRecoverH); +static int tsdbHeadWriteBlockIdx(SRecoverH *pRecoverH); +static int tsdbHeadAddBlock(SRecoverH *pRecoverH, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); + +static int tsdbInitRecoverH(SRecoverH *pRecoverH, STsdbRepo *pRepo) { + memset(pRecoverH, 0, sizeof(SRecoverH)); + + // Init read handle + if (tsdbInitReadH(&(pRecoverH->readh), pRepo) < 0) { + return -1; + } + + pRecoverH->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (pRecoverH->aBlkIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestoryRecoverH(pRecoverH); + return -1; + } + + pRecoverH->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pRecoverH->aSupBlk == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestoryRecoverH(pRecoverH); + return -1; + } + + pRecoverH->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pRecoverH->aSubBlk == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestoryRecoverH(pRecoverH); + return -1; + } + return 0; +} + +static int tsdbDestoryRecoverH(SRecoverH *pRecoverH) { + pRecoverH->aSubBlk = taosArrayDestroy(pRecoverH->aSubBlk); + pRecoverH->aSupBlk = taosArrayDestroy(pRecoverH->aSupBlk); + pRecoverH->aBlkIdx = taosArrayDestroy(pRecoverH->aBlkIdx); + tsdbDestroyReadH(&(pRecoverH->readh)); + return 0; +} + +static int tsdbHeadWriteBlockInfo(SRecoverH *pRecoverH) { + SReadH * pReadH = &pRecoverH->readh; + SDFile * pWHeadf = TSDB_RECOVER_WHEAD_FILE(pRecoverH); + SBlockIdx * pBlkIdx = pReadH->pBlkIdx; + SBlockIdx blkIdx; + SBlock * pBlock = NULL; + uint32_t nSupBlocks = (uint32_t)taosArrayGetSize(pRecoverH->aSupBlk); + uint32_t nSubBlocks = (uint32_t)taosArrayGetSize(pRecoverH->aSubBlk); + uint32_t tlen = 0; + SBlockInfo *pBlkInfo = NULL; + int64_t offset = 0; + + if (nSupBlocks <= 0) { + // No data (data all deleted) + return 0; + } + + tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); + + // Write SBlockInfo part + if (tsdbMakeRoom((void **)(&(TSDB_READ_BUF(pReadH))), tlen) < 0) { + return -1; + } + pBlkInfo = TSDB_READ_BUF(pReadH); + + pBlkInfo->delimiter = TSDB_FILE_DELIMITER; + pBlkInfo->tid = pBlkIdx->tid; + pBlkInfo->uid = pBlkIdx->uid; + + memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pRecoverH->aSupBlk, 0), nSupBlocks * sizeof(SBlock)); + if (nSubBlocks > 0) { + memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pRecoverH->aSubBlk, 0), nSubBlocks * sizeof(SBlock)); + + for (uint32_t i = 0; i < nSupBlocks; ++i) { + pBlock = pBlkInfo->blocks + i; + + if (pBlock->numOfSubBlocks > 1) { + pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); + } + } + } + + taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + + if (tsdbAppendDFile(pWHeadf, TSDB_READ_BUF(pReadH), tlen, &offset) < 0) { + return -1; + } + + tsdbUpdateDFileMagic(pWHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + + // Set blkIdx + pBlock = taosArrayGet(pRecoverH->aSupBlk, nSupBlocks - 1); + + blkIdx.tid = pBlkIdx->tid; + blkIdx.uid = pBlkIdx->uid; + blkIdx.hasLast = pBlock->last ? 1 : 0; + blkIdx.maxKey = pBlock->keyLast; + blkIdx.numOfBlocks = nSupBlocks; + blkIdx.len = tlen; + blkIdx.offset = (uint32_t)offset; + + ASSERT(blkIdx.numOfBlocks > 0); + + if (taosArrayPush(pRecoverH->aBlkIdx, (void *)(&blkIdx)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static int tsdbHeadWriteBlockIdx(SRecoverH *pRecoverH) { + SReadH * pReadH = &pRecoverH->readh; + SDFile * pWHeadf = TSDB_RECOVER_WHEAD_FILE(pRecoverH); + SBlockIdx *pBlkIdx = NULL; + uint32_t nidx = (uint32_t)taosArrayGetSize(pRecoverH->aBlkIdx); + int tlen = 0, size = 0; + int64_t offset = 0; + + if (nidx <= 0) { + // All data are deleted + pWHeadf->info.offset = 0; + pWHeadf->info.len = 0; + return 0; + } + + for (uint32_t i = 0; i < nidx; ++i) { + pBlkIdx = (SBlockIdx *)taosArrayGet(pRecoverH->aBlkIdx, i); + + size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); + if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadH)), tlen + size) < 0) return -1; + + void *ptr = POINTER_SHIFT(TSDB_READ_BUF(pReadH), tlen); + tsdbEncodeSBlockIdx(&ptr, pBlkIdx); + + tlen += size; + } + + tlen += sizeof(TSCKSUM); + if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadH)), tlen) < 0) return -1; + taosCalcChecksumAppend(0, (uint8_t *)TSDB_READ_BUF(pReadH), tlen); + + if (tsdbAppendDFile(pWHeadf, TSDB_READ_BUF(pReadH), tlen, &offset) < tlen) { + tsdbError("vgId:%d failed to write block index part to file %s since %s", REPO_ID(pReadH->pRepo), + TSDB_FILE_FULL_NAME(pWHeadf), tstrerror(terrno)); + return -1; + } + + tsdbUpdateDFileMagic(pWHeadf, POINTER_SHIFT(TSDB_READ_BUF(pReadH), tlen - sizeof(TSCKSUM))); + pWHeadf->info.offset = (uint32_t)offset; + pWHeadf->info.len = tlen; + + return 0; +} + +static int tsdbHeadAddBlock(SRecoverH *pRecoverH, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { + if (taosArrayPush(pRecoverH->aSupBlk, pSupBlock) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pSubBlocks && taosArrayAddBatch(pRecoverH->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static int tsdbInitHFile(STsdbRepo *pRepo, SDFile *pDestDFile, const SDFile *pSrcDFile, int fid) { + SDiskID did; + did.level = pSrcDFile->f.level; + did.id = pSrcDFile->f.id; + + tsdbInitDFile(pDestDFile, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); + if (tsdbCreateDFile(pDestDFile, true, TSDB_FILE_HEAD) < 0) { + tsdbError("vgId:%d failed to create file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDestDFile), + tstrerror(terrno)); + return -1; + } + return TSDB_CODE_SUCCESS; +} + +static int tsdbDestroyHFile(SDFile *pDFile) { + tsdbCloseDFile(pDFile); + return tsdbRemoveDFile(pDFile); +} + +static int tsdbRefactorHeadF(STsdbRepo *pRepo, SRecoverH *pRecoverH, SDFileSet *pSet, int32_t *nRemain) { + SDFile *pHeadF = TSDB_DFILE_IN_SET(pSet, TSDB_FILE_HEAD); + if (pHeadF->info.fver == tsdbGetDFSVersion(TSDB_FILE_HEAD)) { + if (taosArrayPush(REPO_FS(pRepo)->nstatus->df, pSet) == NULL) { + terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + return -1; + } + ++*nRemain; + return TSDB_CODE_SUCCESS; + } + + SReadH *pReadH = &(pRecoverH->readh); + + if (tsdbSetAndOpenReadFSet(pReadH, pSet) < 0) { + return -1; + } + + if (tsdbLoadBlockIdx(pReadH) < 0) { + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + return -1; + } + + SDFile *pTmpHeadF = TSDB_RECOVER_WHEAD_FILE(pRecoverH); + if (tsdbInitHFile(pRepo, pTmpHeadF, pHeadF, pSet->fid) < 0) { + tsdbError("vgId:%d failed to init file %s to refactor since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pTmpHeadF), + tstrerror(terrno)); + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + return -1; + } + + int arraySize = taosArrayGetSize(pReadH->aBlkIdx); + SBlock supBlk; + for (int iBlkIdx = 0; iBlkIdx < arraySize; ++iBlkIdx) { + pReadH->pBlkIdx = taosArrayGet(pReadH->aBlkIdx, iBlkIdx); + pReadH->cidx = iBlkIdx; + + if (tsdbLoadBlockInfo(pReadH, NULL, NULL) < 0) { + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + tsdbDestroyHFile(pTmpHeadF); + return -1; + } + + // clear the reused resource + taosArrayClear(pRecoverH->aSupBlk); + taosArrayClear(pRecoverH->aSubBlk); + + for (uint32_t iSupBlk = 0; iSupBlk < pReadH->pBlkIdx->numOfBlocks; ++iSupBlk) { + SBlock *pSupBlk = pReadH->pBlkInfo->blocks + iSupBlk; + if (pSupBlk->numOfSubBlocks == 1) { + if (tsdbHeadAddBlock(pRecoverH, pSupBlk, NULL, 0) < 0) { + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + tsdbDestroyHFile(pTmpHeadF); + return -1; + } + } else { + supBlk = *pSupBlk; + supBlk.offset = sizeof(SBlock) * taosArrayGetSize(pRecoverH->aSubBlk); + if (tsdbHeadAddBlock(pRecoverH, &supBlk, POINTER_SHIFT(pReadH->pBlkInfo, pSupBlk->offset), + pSupBlk->numOfSubBlocks) < 0) { + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + tsdbDestroyHFile(pTmpHeadF); + return -1; + } + } + } + if (tsdbHeadWriteBlockInfo(pRecoverH) < 0) { + tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", REPO_ID(pReadH->pRepo), + TSDB_FILE_FULL_NAME(pTmpHeadF), tstrerror(terrno)); + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + tsdbDestroyHFile(pTmpHeadF); + return -1; + } + } + + if (tsdbHeadWriteBlockIdx(pRecoverH) < 0) { + tsdbError("vgId:%d failed to write SBlockIdx part into file %s since %s", REPO_ID(pReadH->pRepo), + TSDB_FILE_FULL_NAME(pTmpHeadF), tstrerror(terrno)); + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + tsdbDestroyHFile(pTmpHeadF); + return -1; + } + + if (tsdbUpdateDFileHeader(pTmpHeadF) < 0) { + tsdbError("vgId:%d failed to update header of file %s since %s", REPO_ID(pReadH->pRepo), + TSDB_FILE_FULL_NAME(pTmpHeadF), tstrerror(terrno)); + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + tsdbDestroyHFile(pTmpHeadF); + return -1; + } + + // resource release + tsdbCloseDFileSet(TSDB_READ_FSET(pReadH)); + + TSDB_FILE_FSYNC(pTmpHeadF); + tsdbCloseDFile(pTmpHeadF); + SDFileSet *pDestFSet = TSDB_READ_FSET(pReadH); + tsdbInitDFileEx(TSDB_DFILE_IN_SET(pDestFSet, TSDB_FILE_HEAD), pTmpHeadF); + + if (taosArrayPush(REPO_FS(pRepo)->nstatus->df, pDestFSet) == NULL) { + terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + return -1; + } + + return TSDB_CODE_SUCCESS; +} + +int tsdbRefactorFS(STsdbRepo *pRepo) { + STsdbFS * pfs = REPO_FS(pRepo); + SFSStatus *pStatus = pfs->cstatus; + size_t size = taosArrayGetSize(pStatus->df); + if (size <= 0) { + return TSDB_CODE_SUCCESS; + } + + SRecoverH recoverH; + if (tsdbInitRecoverH(&recoverH, pRepo) < 0) { + return -1; + } + + tsem_wait(&pRepo->readyToCommit); + + tsdbStartFSTxn(pRepo, 0, 0); + + int32_t nRemain = 0; + for (size_t i = 0; i < size; ++i) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i); + if (tsdbRefactorHeadF(pRepo, &recoverH, pSet, &nRemain) < 0) { + tsdbDestoryRecoverH(&recoverH); + tsdbEndFSTxnWithError(REPO_FS(pRepo)); + tsem_post(&pRepo->readyToCommit); + tsdbError("vgId:%d failed to refactor DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + } + + tsdbDestoryRecoverH(&recoverH); + if (nRemain == size) { + tsdbEndFSTxnWithError(REPO_FS(pRepo)); + } else { + if (pStatus != NULL) { + pfs->nstatus->mf = pStatus->mf; + pfs->nstatus->pmf = &pfs->nstatus->mf; + } + if (tsdbEndFSTxn(pRepo) < 0) { + tsem_post(&pRepo->readyToCommit); + return -1; + } + } + tsem_post(&pRepo->readyToCommit); + return TSDB_CODE_SUCCESS; +} \ No newline at end of file