提交 d6d1c1e2 编写于 作者: C Cary Xu

upgrade compatibility for pre-aggregate

上级 3b1c9e36
...@@ -22,20 +22,24 @@ ...@@ -22,20 +22,24 @@
* // TODO update date and add release version. * // TODO update date and add release version.
*/ */
typedef enum { typedef enum {
TSDB_FS_VERSION_0, TSDB_FS_VER_0,
TSDB_FS_VERSION_1, TSDB_FS_VER_1,
} ETsdbFsVersion; } ETsdbFsVer;
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T ftype) { // for DFile #define TSDB_FVER_TYPE uint32_t
switch (ftype) {
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) {
case TSDB_FILE_HEAD: case TSDB_FILE_HEAD:
return TSDB_FS_VERSION_1; return TSDB_FS_VER_1;
default: default:
return TSDB_FS_VERSION_0; return TSDB_FS_VER_0;
} }
} }
static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VERSION_1; } // for current static FORCE_INLINE uint32_t tsdbGetSFSVersion() { return TSDB_FS_VER_1; } // latest version for current
int tsdbRefactorFS(STsdbRepo *pRepo);
// ================== TSDB global config // ================== TSDB global config
extern bool tsdbForceKeepFile; extern bool tsdbForceKeepFile;
......
...@@ -168,6 +168,7 @@ typedef struct { ...@@ -168,6 +168,7 @@ typedef struct {
uint32_t offset; uint32_t offset;
uint64_t size; uint64_t size;
uint64_t tombSize; uint64_t tombSize;
uint32_t fver;
} SDFInfo; } SDFInfo;
typedef struct { typedef struct {
...@@ -175,16 +176,15 @@ typedef struct { ...@@ -175,16 +176,15 @@ typedef struct {
TFILE f; TFILE f;
int fd; int fd;
uint8_t state; uint8_t state;
uint32_t fver;
} SDFile; } SDFile;
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype); void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile, uint32_t sfver);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader, TSDB_FILE_T ftype); int tsdbCreateDFile(SDFile* pDFile, bool updateHeader, TSDB_FILE_T ftype);
int tsdbUpdateDFileHeader(SDFile* pDFile); int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbUpdateDFileHeaderEx(SDFile* pDFile, uint32_t fver);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo); int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version); int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
...@@ -319,7 +319,7 @@ typedef struct { ...@@ -319,7 +319,7 @@ typedef struct {
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver); void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, bool containNFiles); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet, uint32_t sfver);
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
......
...@@ -35,6 +35,7 @@ typedef struct { ...@@ -35,6 +35,7 @@ typedef struct {
TSKEY maxKey; TSKEY maxKey;
} SBlockIdx; } SBlockIdx;
#if 0
typedef struct { typedef struct {
int64_t last : 1; int64_t last : 1;
int64_t offset : 63; int64_t offset : 63;
...@@ -46,14 +47,51 @@ typedef struct { ...@@ -46,14 +47,51 @@ typedef struct {
int16_t numOfCols; // not including timestamp column int16_t numOfCols; // not including timestamp column
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
#ifdef __TD_6117__ } SBlock;
int64_t hasAggr : 1;
int64_t blkVer : 7;
int64_t aggrOffset : 56;
int32_t aggrLen;
#endif #endif
} SBlock;
/**
* int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
* int16_t numOfCols; // not including timestamp column
*/
#define SBlockFieldsP0 \
int64_t last : 1; \
int64_t offset : 63; \
int32_t algorithm : 8; \
int32_t numOfRows : 24; \
int32_t len; \
int32_t keyLen; \
int16_t numOfSubBlocks; \
int16_t numOfCols; \
TSKEY keyFirst; \
TSKEY keyLast
#define SBlockFieldsP1 \
int64_t hasAggr : 1; \
int64_t blkVer : 7; \
int64_t aggrOffset : 56; \
int32_t aggrLen
typedef struct {
SBlockFieldsP0;
} SBlockV0;
typedef struct {
SBlockFieldsP0;
SBlockFieldsP1;
} SBlockV1;
typedef enum {
TSDB_SBLK_VER_0,
TSDB_SBLK_VER_1,
} ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_1
#define SBlockBase SBlockV0 // base SBlock definition
#define SBlock SBlockV1 // latest SBlock definition
// lastest SBlockInfo definition
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t tid; int32_t tid;
...@@ -61,20 +99,40 @@ typedef struct { ...@@ -61,20 +99,40 @@ typedef struct {
SBlock blocks[]; SBlock blocks[];
} SBlockInfo; } SBlockInfo;
// definition of SBlockInfoV{#verion}
#define SBlockInfoV(version) \
typedef struct { \
int32_t delimiter; \
int32_t tid; \
uint64_t uid; \
SBlockV##version blocks[]; \
} SBlockInfoV##version;
typedef struct { typedef struct {
int16_t colId; int16_t colId;
int32_t len; int32_t len;
uint32_t type : 8; uint32_t type : 8;
uint32_t offset : 24; uint32_t offset : 24;
// int64_t sum; int64_t sum;
// int64_t max; int64_t max;
// int64_t min; int64_t min;
// int16_t maxIndex; int16_t maxIndex;
// int16_t minIndex; int16_t minIndex;
// int16_t numOfNull; int16_t numOfNull;
uint8_t offsetH; uint8_t offsetH;
char padding[1]; char padding[1];
} SBlockCol; } SBlockColV0;
typedef struct {
int16_t colId;
int32_t len;
uint32_t type : 8;
uint32_t offset : 24;
uint8_t offsetH;
char padding[1];
} SBlockColV1;
#define SBlockColBase SBlockColV0 // base SBlockCol definition
#define SBlockCol SBlockColV1 // latest SBlockCol definition
typedef struct { typedef struct {
int16_t colId; int16_t colId;
...@@ -118,7 +176,7 @@ struct SReadH { ...@@ -118,7 +176,7 @@ struct SReadH {
STable * pTable; // table to read STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx; int cidx;
SBlockInfo *pBlkInfo; SBlockInfo * pBlkInfo; // SBlockInfoV#
SBlockData *pBlkData; // Block info SBlockData *pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2]; SDataCols * pDCols[2];
...@@ -148,7 +206,7 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); ...@@ -148,7 +206,7 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void tsdbCloseAndUnsetFSet(SReadH *pReadh); void tsdbCloseAndUnsetFSet(SReadH *pReadh);
int tsdbLoadBlockIdx(SReadH *pReadh); int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, int32_t *extendedLen);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds); int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
...@@ -177,4 +235,20 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { ...@@ -177,4 +235,20 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0; return 0;
} }
FORCE_INLINE int tsdbInitReadHBlkIdx(SReadH *pReadh, STsdbRepo *pRepo) {
ASSERT(pReadh != NULL && pRepo != NULL);
memset((void *)pReadh, 0, sizeof(*pReadh));
pReadh->pRepo = pRepo;
TSDB_FSET_SET_INIT(TSDB_READ_FSET(pReadh));
pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pReadh->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
#endif /*_TD_TSDB_READ_IMPL_H_*/ #endif /*_TD_TSDB_READ_IMPL_H_*/
...@@ -949,7 +949,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -949,7 +949,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
SBlock *pBlock; SBlock *pBlock;
if (pCommith->readh.pBlkIdx) { if (pCommith->readh.pBlkIdx) {
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { if (tsdbLoadBlockInfo(&(pCommith->readh), NULL, NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
......
...@@ -360,7 +360,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -360,7 +360,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
tsdbUnRefTable(pTh->pTable); tsdbUnRefTable(pTh->pTable);
} }
pTh->pInfo = taosTZfree(pTh->pInfo); // pTh->pInfo = taosTZfree(pTh->pInfo);
tfree(pTh->pInfo);
} }
pComph->tbArray = taosArrayDestroy(pComph->tbArray); pComph->tbArray = taosArrayDestroy(pComph->tbArray);
...@@ -386,11 +387,11 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { ...@@ -386,11 +387,11 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
pTh->bindex = *(pReadH->pBlkIdx); pTh->bindex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bindex); pTh->pBlkIdx = &(pTh->bindex);
if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) { // if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) {
return -1; // return -1;
} // }
int32_t originLen = 0;
if (tsdbLoadBlockInfo(pReadH, (void *)(pTh->pInfo)) < 0) { if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) {
return -1; return -1;
} }
} }
......
...@@ -90,7 +90,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { ...@@ -90,7 +90,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return tlen; return tlen;
} }
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFiles) { static int tsdbDecodeDFileSetArray(void **originBuf, void *buf, SArray *pArray, SFSHeader *pSFSHeader) {
uint64_t nset; uint64_t nset;
SDFileSet dset; SDFileSet dset;
dset.nFiles = TSDB_FILE_MIN; // default value: .head/.data/.last dset.nFiles = TSDB_FILE_MIN; // default value: .head/.data/.last
...@@ -98,11 +98,24 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFil ...@@ -98,11 +98,24 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray, bool containNFil
taosArrayClear(pArray); taosArrayClear(pArray);
buf = taosDecodeFixedU64(buf, &nset); buf = taosDecodeFixedU64(buf, &nset);
if (pSFSHeader->version == TSDB_FS_VER_0) {
uint32_t extendedSize = pSFSHeader->len + nset * TSDB_FILE_MAX * sizeof(TSDB_FVER_TYPE);
if (taosTSizeof(*originBuf) < extendedSize) {
int ptrDistance = POINTER_DISTANCE(buf, *originBuf);
if (tsdbMakeRoom(originBuf, extendedSize) < 0) {
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
return -1;
}
buf = POINTER_SHIFT(*originBuf, ptrDistance);
}
}
for (size_t i = 0; i < nset; i++) { for (size_t i = 0; i < nset; i++) {
buf = tsdbDecodeDFileSet(buf, &dset, containNFiles); buf = tsdbDecodeDFileSet(buf, &dset, pSFSHeader->version);
taosArrayPush(pArray, (void *)(&dset)); taosArrayPush(pArray, (void *)(&dset));
} }
return buf; return TSDB_CODE_SUCCESS;
} }
static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
...@@ -116,14 +129,12 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { ...@@ -116,14 +129,12 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen; return tlen;
} }
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus, bool containNFiles) { static int tsdbDecodeFSStatus(void **originBuf, void *buf, SFSStatus *pStatus, SFSHeader *pSFSHeader) {
tsdbResetFSStatus(pStatus); tsdbResetFSStatus(pStatus);
pStatus->pmf = &(pStatus->mf); pStatus->pmf = &(pStatus->mf);
buf = tsdbDecodeSMFile(buf, pStatus->pmf); buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df, containNFiles); return tsdbDecodeDFileSetArray(originBuf, buf, pStatus->df, pSFSHeader);
return buf;
} }
static SFSStatus *tsdbNewFSStatus(int maxFSet) { static SFSStatus *tsdbNewFSStatus(int maxFSet) {
...@@ -330,6 +341,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) { ...@@ -330,6 +341,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
return -1; return -1;
} }
// add switch to control
if (tsdbRefactorFS(pRepo) < 0) {
tsdbError("vgId:%d failed to refactor FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// Load meta cache if has meta file // Load meta cache if has meta file
if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) { if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) {
tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -690,7 +707,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { ...@@ -690,7 +707,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
ptr = tsdbDecodeFSHeader(ptr, &fsheader); ptr = tsdbDecodeFSHeader(ptr, &fsheader);
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta)); ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
if (fsheader.version != TSDB_FS_VERSION_0) { if (fsheader.version != TSDB_FS_VER_0) {
// TODO: handle file version change // TODO: handle file version change
} }
...@@ -719,7 +736,9 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { ...@@ -719,7 +736,9 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
} }
ptr = buffer; ptr = buffer;
ptr = tsdbDecodeFSStatus(ptr, pStatus, fsheader.version == TSDB_FS_VERSION_0 ? false : true); if (tsdbDecodeFSStatus(&buffer, ptr, pStatus, &fsheader) < 0) {
goto _err;
}
} else { } else {
tsdbResetFSStatus(pStatus); tsdbResetFSStatus(pStatus);
} }
......
...@@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = { ...@@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
static int tsdbRollBackMFile(SMFile *pMFile); static int tsdbRollBackMFile(SMFile *pMFile);
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo, TSDB_FVER_TYPE sfver);
static int tsdbRollBackDFile(SDFile *pDFile); static int tsdbRollBackDFile(SDFile *pDFile);
// ============== SMFile // ============== SMFile
...@@ -302,6 +302,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, ...@@ -302,6 +302,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
memset(&(pDFile->info), 0, sizeof(pDFile->info)); memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC; pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(vid, fid, ver, ftype, fname); tsdbGetFilename(vid, fid, ver, ftype, fname);
tfsInitFile(&(pDFile->f), did.level, did.id, fname); tfsInitFile(&(pDFile->f), did.level, did.id, fname);
...@@ -321,8 +322,8 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) { ...@@ -321,8 +322,8 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
return tlen; return tlen;
} }
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) { void *tsdbDecodeSDFile(void *buf, SDFile *pDFile, uint32_t sfver) {
buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); buf = tsdbDecodeDFInfo(buf, &(pDFile->info), sfver);
buf = tfsDecodeFile(buf, &(pDFile->f)); buf = tfsDecodeFile(buf, &(pDFile->f));
TSDB_FILE_SET_CLOSED(pDFile); TSDB_FILE_SET_CLOSED(pDFile);
...@@ -341,7 +342,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) { ...@@ -341,7 +342,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
char *aname; char *aname;
buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); buf = tsdbDecodeDFInfo(buf, &(pDFile->info), tsdbGetSFSVersion());
buf = taosDecodeString(buf, &aname); buf = taosDecodeString(buf, &aname);
strncpy(TSDB_FILE_FULL_NAME(pDFile), aname, TSDB_FILENAME_LEN); strncpy(TSDB_FILE_FULL_NAME(pDFile), aname, TSDB_FILENAME_LEN);
TSDB_FILE_SET_CLOSED(pDFile); TSDB_FILE_SET_CLOSED(pDFile);
...@@ -353,7 +354,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { ...@@ -353,7 +354,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755); pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (pDFile->fd < 0) { if (pDFile->fd < 0) {
if (errno == ENOENT) { if (errno == ENOENT) {
// Try to create directory recursively // Try to create directory recursively
...@@ -364,7 +365,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ...@@ -364,7 +365,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
} }
tfree(s); tfree(s);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0755); pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (pDFile->fd < 0) { if (pDFile->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -380,8 +381,9 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ...@@ -380,8 +381,9 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
} }
pDFile->info.size += TSDB_FILE_HEAD_SIZE; pDFile->info.size += TSDB_FILE_HEAD_SIZE;
pDFile->info.fver = tsdbGetDFSVersion(fType);
if (tsdbUpdateDFileHeaderEx(pDFile, tsdbGetDFSVersion(fType)) < 0) { if (tsdbUpdateDFileHeader(pDFile) < 0) {
tsdbCloseDFile(pDFile); tsdbCloseDFile(pDFile);
tsdbRemoveDFile(pDFile); tsdbRemoveDFile(pDFile);
return -1; return -1;
...@@ -390,53 +392,14 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ...@@ -390,53 +392,14 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
return 0; return 0;
} }
// keep the fver in DFileHeader(e.g. during fs check in openFS)
int tsdbUpdateDFileHeader(SDFile *pDFile) { int tsdbUpdateDFileHeader(SDFile *pDFile) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0";
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
tsdbDebug("prop:file %s seek to read fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
if (tsdbReadDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
tsdbDebug("prop:file %s read fail, fd is %d", TSDB_FILE_FULL_NAME(pDFile), pDFile->fd);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbDebug("prop:file %s checksum fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
void *ptr = POINTER_SHIFT(buf, sizeof(uint32_t));
tsdbEncodeDFInfo(&ptr, &(pDFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
tsdbDebug("prop:file %s seek to write fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
if (tsdbWriteDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
tsdbDebug("prop:file %s write fail", TSDB_FILE_FULL_NAME(pDFile));
return -1;
}
return 0;
}
// update the fver in DFileHeader
int tsdbUpdateDFileHeaderEx(SDFile *pDFile, uint32_t fver) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) { if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
return -1; return -1;
} }
void *ptr = buf; void *ptr = buf;
taosEncodeFixedU32(&ptr, fver);
tsdbEncodeDFInfo(&ptr, &(pDFile->info)); tsdbEncodeDFInfo(&ptr, &(pDFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
...@@ -467,8 +430,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { ...@@ -467,8 +430,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
} }
void *pBuf = buf; void *pBuf = buf;
pBuf = taosDecodeFixedU32(pBuf, &(pDFile->fver)); pBuf = tsdbDecodeDFInfo(pBuf, pInfo, TSDB_FS_VER_1);
pBuf = tsdbDecodeDFInfo(pBuf, pInfo);
return 0; return 0;
} }
...@@ -526,7 +488,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { ...@@ -526,7 +488,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU32(buf, pInfo->magic); tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
...@@ -538,7 +500,12 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { ...@@ -538,7 +500,12 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
return tlen; return tlen;
} }
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo, TSDB_FVER_TYPE sfver) {
if (sfver > TSDB_FS_VER_0) {
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
} else {
pInfo->fver = TSDB_FS_VER_0; // default value
}
buf = taosDecodeFixedU32(buf, &(pInfo->magic)); buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
...@@ -620,25 +587,27 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { ...@@ -620,25 +587,27 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
tlen += taosEncodeFixedI32(buf, pSet->fid); tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedU8(buf, pSet->nFiles); tlen += taosEncodeFixedU8(buf, pSet->nFiles);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
} }
return tlen; return tlen;
} }
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, bool containNFiles) { void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet, uint32_t sfver) {
int32_t fid; int32_t fid;
buf = taosDecodeFixedI32(buf, &(fid)); buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0; pSet->state = 0;
pSet->fid = fid; pSet->fid = fid;
if (containNFiles) {
if (sfver > TSDB_FS_VER_0) {
buf = taosDecodeFixedU8(buf, &(pSet->nFiles)); buf = taosDecodeFixedU8(buf, &(pSet->nFiles));
} }
ASSERT(TSDB_FSET_NFILES_VALID(pSet)); ASSERT(TSDB_FSET_NFILES_VALID(pSet));
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype), sfver);
} }
return buf; return buf;
} }
...@@ -647,7 +616,8 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) { ...@@ -647,7 +616,8 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid); tlen += taosEncodeFixedI32(buf, pSet->fid);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { tlen += taosEncodeFixedU8(buf, pSet->nFiles);
for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
} }
...@@ -658,8 +628,9 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) { ...@@ -658,8 +628,9 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
int32_t fid; int32_t fid;
buf = taosDecodeFixedI32(buf, &(fid)); buf = taosDecodeFixedI32(buf, &(fid));
buf = taosDecodeFixedU8(buf, &(pSet->nFiles));
pSet->fid = fid; pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < pSet->nFiles; ftype++) {
buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
} }
return buf; return buf;
......
...@@ -673,7 +673,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -673,7 +673,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
tdInitDataRow(memRowDataBody(row), pSchema); tdInitDataRow(memRowDataBody(row), pSchema);
// first load block index info // first load block index info
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) {
err = -1; err = -1;
goto out; goto out;
} }
...@@ -775,7 +775,7 @@ out: ...@@ -775,7 +775,7 @@ out:
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL); ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) {
return -1; return -1;
} }
......
...@@ -1086,21 +1086,23 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int ...@@ -1086,21 +1086,23 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int
return 0; // no data blocks in the file belongs to pCheckInfo->pTable return 0; // no data blocks in the file belongs to pCheckInfo->pTable
} }
if (pCheckInfo->compSize < (int32_t)compIndex->len) {
assert(compIndex->len > 0); assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); // if (pCheckInfo->compSize < (int32_t)compIndex->len) {
if (t == NULL) { // assert(compIndex->len > 0);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
return code;
}
pCheckInfo->pCompInfo = (SBlockInfo*)t; // char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
pCheckInfo->compSize = compIndex->len; // if (t == NULL) {
} // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// code = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return code;
// }
// pCheckInfo->pCompInfo = (SBlockInfo*)t;
// pCheckInfo->compSize = compIndex->len;
// }
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) { if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo), &pCheckInfo->compSize) < 0) {
return terrno; return terrno;
} }
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
......
...@@ -33,7 +33,8 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { ...@@ -33,7 +33,8 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
memset((void *)pReadh, 0, sizeof(*pReadh)); memset((void *)pReadh, 0, sizeof(*pReadh));
pReadh->pRepo = pRepo; pReadh->pRepo = pRepo;
TSDB_FSET_SET_INIT(TSDB_READ_FSET(pReadh));
TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pReadh->aBlkIdx == NULL) { if (pReadh->aBlkIdx == NULL) {
...@@ -199,6 +200,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { ...@@ -199,6 +200,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
return 0; return 0;
} }
#if 0
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
ASSERT(pReadh->pBlkIdx != NULL); ASSERT(pReadh->pBlkIdx != NULL);
...@@ -242,6 +244,124 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { ...@@ -242,6 +244,124 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
return 0; return 0;
} }
#endif
static FORCE_INLINE int32_t tsdbGetSBlockVer(int32_t fver) {
switch (fver) {
case TSDB_FS_VER_0:
return TSDB_SBLK_VER_0;
case TSDB_FS_VER_1:
default:
return TSDB_SBLK_VER_1;
}
}
static FORCE_INLINE size_t tsdbSizeOfSBlock(int32_t sBlkVer) {
switch (sBlkVer) {
case TSDB_SBLK_VER_0:
return sizeof(SBlockV0);
case TSDB_SBLK_VER_1:
return sizeof(SBlockV1);
default:
return sizeof(SBlock);
}
}
static int tsdbHeadRefactor(SDFile *pHeadf, SBlockInfo *pSrcBlkInfo, uint32_t srcBlkInfoLen, SBlockInfo **pDstBlkInfo,
int32_t *dstBlkInfoLen) {
int sBlkVer = tsdbGetSBlockVer(pHeadf->info.fver);
if (sBlkVer == SBlockVerLatest) {
*pDstBlkInfo = pSrcBlkInfo;
*dstBlkInfoLen = srcBlkInfoLen;
return TSDB_CODE_SUCCESS;
}
uint32_t originBlkSize = tsdbSizeOfSBlock(sBlkVer);
int nBlks = (srcBlkInfoLen - sizeof(SBlockInfo)) / originBlkSize;
*dstBlkInfoLen = sizeof(SBlockInfo) + nBlks * sizeof(SBlock);
if (srcBlkInfoLen == *dstBlkInfoLen) {
*pDstBlkInfo = pSrcBlkInfo;
return TSDB_CODE_SUCCESS;
}
ASSERT(*dstBlkInfoLen >= srcBlkInfoLen);
if (tsdbMakeRoom((void **)(pDstBlkInfo), *dstBlkInfoLen) < 0) return -1;
memset(*pDstBlkInfo, 0, *dstBlkInfoLen); // the blkVer is set to 0
memcpy(*pDstBlkInfo, pSrcBlkInfo, sizeof(SBlockInfo)); // copy header
for (int i = 0; i < nBlks; ++i) {
memcpy((*pDstBlkInfo)->blocks + i, POINTER_SHIFT(pSrcBlkInfo->blocks, i * originBlkSize), originBlkSize);
// TODO: update the fields if the SBlock definition changed later
}
taosTZfree(pSrcBlkInfo);
return TSDB_CODE_SUCCESS;
}
int tsdbLoadBlockInfo(SReadH *pReadh, void **pTarget, int32_t *extendedLen) {
ASSERT(pReadh->pBlkIdx != NULL);
SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh);
SBlockIdx * pBlkIdx = pReadh->pBlkIdx;
SBlockInfo *pBlkInfo = NULL;
if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
return -1;
}
if (tsdbMakeRoom((void **)(&pBlkInfo), pBlkIdx->len) < 0) return -1;
int64_t nread = tsdbReadDFile(pHeadf, (void *)pBlkInfo, pBlkIdx->len);
if (nread < 0) {
tsdbError("vgId:%d failed to load SBlockInfo part while read file %s since %s, offset:%u len :%u",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
taosTZfree(pBlkInfo);
return -1;
}
if (nread < pBlkIdx->len) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread);
taosTZfree(pBlkInfo);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)pBlkInfo, pBlkIdx->len)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len);
taosTZfree(pBlkInfo);
return -1;
}
ASSERT(pBlkIdx->tid == pBlkInfo->tid && pBlkIdx->uid == pBlkInfo->uid);
int32_t dstBlkInfoLen = 0;
if (tsdbHeadRefactor(pHeadf, pBlkInfo, pBlkIdx->len, &(pReadh->pBlkInfo), &dstBlkInfoLen) < 0) {
taosTZfree(pBlkInfo);
return -1;
}
if (extendedLen != NULL) {
if (pTarget != NULL) {
if (*extendedLen < dstBlkInfoLen) {
char *t = realloc(*pTarget, dstBlkInfoLen);
if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
*pTarget = t;
}
memcpy(*pTarget, (void *)(pReadh->pBlkInfo), dstBlkInfoLen);
}
*extendedLen = dstBlkInfoLen;
}
return TSDB_CODE_SUCCESS;
}
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
ASSERT(pBlock->numOfSubBlocks > 0); ASSERT(pBlock->numOfSubBlocks > 0);
......
...@@ -12,3 +12,365 @@ ...@@ -12,3 +12,365 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbint.h"
typedef struct {
SReadH readh;
SDFileSet wSet;
SArray * aBlkIdx; // SBlockIdx array
SArray * aSupBlk; // Table super-block array
SArray * aSubBlk; // table sub-block array
} SRecoverH;
#define TSDB_RECOVER_WFSET(rh) (&((rh)->wSet))
#define TSDB_RECOVER_WHEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_RECOVER_WFSET(rh), TSDB_FILE_HEAD)
#define TSDB_RECOVER_WDATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_RECOVER_WFSET(rh), TSDB_FILE_DATA)
#define TSDB_RECOVER_WLAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_RECOVER_WFSET(rh), TSDB_FILE_LAST)
static int tsdbInitRecoverH(SRecoverH *pRecoverH, STsdbRepo *pRepo);
static int tsdbDestoryRecoverH(SRecoverH *pRecoverH);
static int tsdbInitHFile(STsdbRepo *pRepo, SDFile *pDestDFile, const SDFile *pSrcDFile, int fid);
static int tsdbDestroyHFile(SDFile *pDFile);
static int tsdbHeadWriteBlockInfo(SRecoverH *pRecoverH);
static int tsdbHeadWriteBlockIdx(SRecoverH *pRecoverH);
static int tsdbHeadAddBlock(SRecoverH *pRecoverH, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbInitRecoverH(SRecoverH *pRecoverH, STsdbRepo *pRepo) {
memset(pRecoverH, 0, sizeof(SRecoverH));
// Init read handle
if (tsdbInitReadH(&(pRecoverH->readh), pRepo) < 0) {
return -1;
}
pRecoverH->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pRecoverH->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestoryRecoverH(pRecoverH);
return -1;
}
pRecoverH->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pRecoverH->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestoryRecoverH(pRecoverH);
return -1;
}
pRecoverH->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
if (pRecoverH->aSubBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestoryRecoverH(pRecoverH);
return -1;
}
return 0;
}
static int tsdbDestoryRecoverH(SRecoverH *pRecoverH) {
pRecoverH->aSubBlk = taosArrayDestroy(pRecoverH->aSubBlk);
pRecoverH->aSupBlk = taosArrayDestroy(pRecoverH->aSupBlk);
pRecoverH->aBlkIdx = taosArrayDestroy(pRecoverH->aBlkIdx);
tsdbDestroyReadH(&(pRecoverH->readh));
return 0;
}
static int tsdbHeadWriteBlockInfo(SRecoverH *pRecoverH) {
SReadH * pReadH = &pRecoverH->readh;
SDFile * pWHeadf = TSDB_RECOVER_WHEAD_FILE(pRecoverH);
SBlockIdx * pBlkIdx = pReadH->pBlkIdx;
SBlockIdx blkIdx;
SBlock * pBlock = NULL;
uint32_t nSupBlocks = (uint32_t)taosArrayGetSize(pRecoverH->aSupBlk);
uint32_t nSubBlocks = (uint32_t)taosArrayGetSize(pRecoverH->aSubBlk);
uint32_t tlen = 0;
SBlockInfo *pBlkInfo = NULL;
int64_t offset = 0;
if (nSupBlocks <= 0) {
// No data (data all deleted)
return 0;
}
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
// Write SBlockInfo part
if (tsdbMakeRoom((void **)(&(TSDB_READ_BUF(pReadH))), tlen) < 0) {
return -1;
}
pBlkInfo = TSDB_READ_BUF(pReadH);
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = pBlkIdx->tid;
pBlkInfo->uid = pBlkIdx->uid;
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pRecoverH->aSupBlk, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pRecoverH->aSubBlk, 0), nSubBlocks * sizeof(SBlock));
for (uint32_t i = 0; i < nSupBlocks; ++i) {
pBlock = pBlkInfo->blocks + i;
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
}
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
if (tsdbAppendDFile(pWHeadf, TSDB_READ_BUF(pReadH), tlen, &offset) < 0) {
return -1;
}
tsdbUpdateDFileMagic(pWHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
// Set blkIdx
pBlock = taosArrayGet(pRecoverH->aSupBlk, nSupBlocks - 1);
blkIdx.tid = pBlkIdx->tid;
blkIdx.uid = pBlkIdx->uid;
blkIdx.hasLast = pBlock->last ? 1 : 0;
blkIdx.maxKey = pBlock->keyLast;
blkIdx.numOfBlocks = nSupBlocks;
blkIdx.len = tlen;
blkIdx.offset = (uint32_t)offset;
ASSERT(blkIdx.numOfBlocks > 0);
if (taosArrayPush(pRecoverH->aBlkIdx, (void *)(&blkIdx)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbHeadWriteBlockIdx(SRecoverH *pRecoverH) {
SReadH * pReadH = &pRecoverH->readh;
SDFile * pWHeadf = TSDB_RECOVER_WHEAD_FILE(pRecoverH);
SBlockIdx *pBlkIdx = NULL;
uint32_t nidx = (uint32_t)taosArrayGetSize(pRecoverH->aBlkIdx);
int tlen = 0, size = 0;
int64_t offset = 0;
if (nidx <= 0) {
// All data are deleted
pWHeadf->info.offset = 0;
pWHeadf->info.len = 0;
return 0;
}
for (uint32_t i = 0; i < nidx; ++i) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pRecoverH->aBlkIdx, i);
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadH)), tlen + size) < 0) return -1;
void *ptr = POINTER_SHIFT(TSDB_READ_BUF(pReadH), tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
tlen += size;
}
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadH)), tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)TSDB_READ_BUF(pReadH), tlen);
if (tsdbAppendDFile(pWHeadf, TSDB_READ_BUF(pReadH), tlen, &offset) < tlen) {
tsdbError("vgId:%d failed to write block index part to file %s since %s", REPO_ID(pReadH->pRepo),
TSDB_FILE_FULL_NAME(pWHeadf), tstrerror(terrno));
return -1;
}
tsdbUpdateDFileMagic(pWHeadf, POINTER_SHIFT(TSDB_READ_BUF(pReadH), tlen - sizeof(TSCKSUM)));
pWHeadf->info.offset = (uint32_t)offset;
pWHeadf->info.len = tlen;
return 0;
}
static int tsdbHeadAddBlock(SRecoverH *pRecoverH, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
if (taosArrayPush(pRecoverH->aSupBlk, pSupBlock) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (pSubBlocks && taosArrayAddBatch(pRecoverH->aSubBlk, pSubBlocks, nSubBlocks) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbInitHFile(STsdbRepo *pRepo, SDFile *pDestDFile, const SDFile *pSrcDFile, int fid) {
SDiskID did;
did.level = pSrcDFile->f.level;
did.id = pSrcDFile->f.id;
tsdbInitDFile(pDestDFile, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pDestDFile, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to create file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDestDFile),
tstrerror(terrno));
return -1;
}
return TSDB_CODE_SUCCESS;
}
static int tsdbDestroyHFile(SDFile *pDFile) {
tsdbCloseDFile(pDFile);
return tsdbRemoveDFile(pDFile);
}
static int tsdbRefactorHeadF(STsdbRepo *pRepo, SRecoverH *pRecoverH, SDFileSet *pSet, int32_t *nRemain) {
SDFile *pHeadF = TSDB_DFILE_IN_SET(pSet, TSDB_FILE_HEAD);
if (pHeadF->info.fver == tsdbGetDFSVersion(TSDB_FILE_HEAD)) {
if (taosArrayPush(REPO_FS(pRepo)->nstatus->df, pSet) == NULL) {
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
return -1;
}
++*nRemain;
return TSDB_CODE_SUCCESS;
}
SReadH *pReadH = &(pRecoverH->readh);
if (tsdbSetAndOpenReadFSet(pReadH, pSet) < 0) {
return -1;
}
if (tsdbLoadBlockIdx(pReadH) < 0) {
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
return -1;
}
SDFile *pTmpHeadF = TSDB_RECOVER_WHEAD_FILE(pRecoverH);
if (tsdbInitHFile(pRepo, pTmpHeadF, pHeadF, pSet->fid) < 0) {
tsdbError("vgId:%d failed to init file %s to refactor since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pTmpHeadF),
tstrerror(terrno));
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
return -1;
}
int arraySize = taosArrayGetSize(pReadH->aBlkIdx);
SBlock supBlk;
for (int iBlkIdx = 0; iBlkIdx < arraySize; ++iBlkIdx) {
pReadH->pBlkIdx = taosArrayGet(pReadH->aBlkIdx, iBlkIdx);
pReadH->cidx = iBlkIdx;
if (tsdbLoadBlockInfo(pReadH, NULL, NULL) < 0) {
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
tsdbDestroyHFile(pTmpHeadF);
return -1;
}
// clear the reused resource
taosArrayClear(pRecoverH->aSupBlk);
taosArrayClear(pRecoverH->aSubBlk);
for (uint32_t iSupBlk = 0; iSupBlk < pReadH->pBlkIdx->numOfBlocks; ++iSupBlk) {
SBlock *pSupBlk = pReadH->pBlkInfo->blocks + iSupBlk;
if (pSupBlk->numOfSubBlocks == 1) {
if (tsdbHeadAddBlock(pRecoverH, pSupBlk, NULL, 0) < 0) {
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
tsdbDestroyHFile(pTmpHeadF);
return -1;
}
} else {
supBlk = *pSupBlk;
supBlk.offset = sizeof(SBlock) * taosArrayGetSize(pRecoverH->aSubBlk);
if (tsdbHeadAddBlock(pRecoverH, &supBlk, POINTER_SHIFT(pReadH->pBlkInfo, pSupBlk->offset),
pSupBlk->numOfSubBlocks) < 0) {
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
tsdbDestroyHFile(pTmpHeadF);
return -1;
}
}
}
if (tsdbHeadWriteBlockInfo(pRecoverH) < 0) {
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", REPO_ID(pReadH->pRepo),
TSDB_FILE_FULL_NAME(pTmpHeadF), tstrerror(terrno));
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
tsdbDestroyHFile(pTmpHeadF);
return -1;
}
}
if (tsdbHeadWriteBlockIdx(pRecoverH) < 0) {
tsdbError("vgId:%d failed to write SBlockIdx part into file %s since %s", REPO_ID(pReadH->pRepo),
TSDB_FILE_FULL_NAME(pTmpHeadF), tstrerror(terrno));
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
tsdbDestroyHFile(pTmpHeadF);
return -1;
}
if (tsdbUpdateDFileHeader(pTmpHeadF) < 0) {
tsdbError("vgId:%d failed to update header of file %s since %s", REPO_ID(pReadH->pRepo),
TSDB_FILE_FULL_NAME(pTmpHeadF), tstrerror(terrno));
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
tsdbDestroyHFile(pTmpHeadF);
return -1;
}
// resource release
tsdbCloseDFileSet(TSDB_READ_FSET(pReadH));
TSDB_FILE_FSYNC(pTmpHeadF);
tsdbCloseDFile(pTmpHeadF);
SDFileSet *pDestFSet = TSDB_READ_FSET(pReadH);
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pDestFSet, TSDB_FILE_HEAD), pTmpHeadF);
if (taosArrayPush(REPO_FS(pRepo)->nstatus->df, pDestFSet) == NULL) {
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
return -1;
}
return TSDB_CODE_SUCCESS;
}
int tsdbRefactorFS(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
size_t size = taosArrayGetSize(pStatus->df);
if (size <= 0) {
return TSDB_CODE_SUCCESS;
}
SRecoverH recoverH;
if (tsdbInitRecoverH(&recoverH, pRepo) < 0) {
return -1;
}
tsem_wait(&pRepo->readyToCommit);
tsdbStartFSTxn(pRepo, 0, 0);
int32_t nRemain = 0;
for (size_t i = 0; i < size; ++i) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i);
if (tsdbRefactorHeadF(pRepo, &recoverH, pSet, &nRemain) < 0) {
tsdbDestoryRecoverH(&recoverH);
tsdbEndFSTxnWithError(REPO_FS(pRepo));
tsem_post(&pRepo->readyToCommit);
tsdbError("vgId:%d failed to refactor DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
tsdbDestoryRecoverH(&recoverH);
if (nRemain == size) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else {
if (pStatus != NULL) {
pfs->nstatus->mf = pStatus->mf;
pfs->nstatus->pmf = &pfs->nstatus->mf;
}
if (tsdbEndFSTxn(pRepo) < 0) {
tsem_post(&pRepo->readyToCommit);
return -1;
}
}
tsem_post(&pRepo->readyToCommit);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册