From 91d7c7748f29af9049d2acce352cdc88ab15e4af Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 3 Sep 2020 17:21:23 +0800 Subject: [PATCH] refactor part of code --- src/tsdb/inc/tsdbMain.h | 8 +- src/tsdb/src/tsdbFile.c | 73 +++++++++------- src/tsdb/src/tsdbMain.c | 4 +- src/tsdb/src/tsdbRWHelper.c | 168 ++++++++++++++++++++++-------------- 4 files changed, 153 insertions(+), 100 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index e7a86798ee..ede0b33d01 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -45,6 +45,8 @@ extern int tsdbDebugFlag; #define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF +#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax))) + // NOTE: Any file format change must increase this version number by 1 // Also, implement the convert function #define TSDB_FILE_VERSION ((uint32_t)0) @@ -475,6 +477,7 @@ int tsdbUpdateFileHeader(SFile* pFile); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); +int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); @@ -513,7 +516,10 @@ int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbWriteCompInfo(SRWHelper* pHelper); int tsdbWriteCompIdx(SRWHelper* pHelper); +int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); +int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SCompIdx** ppCompIdx, int* numOfIdx); int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); +int tsdbLoadCompInfoImpl(SFile* pFile, SCompIdx* pIdx, SCompInfo** ppCompInfo); int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); @@ -537,7 +543,7 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) char* tsdbGetMetaFileName(char* rootDir); -void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname); +void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname); int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); char* tsdbGetDataDirName(char* rootDir); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 29e46a88af..1009a61e86 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -302,7 +302,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { memset((void *)pFile, 0, sizeof(SFile)); pFile->fd = -1; - tsdbGetDataFileName(pRepo, fid, type, pFile->fname); + tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); if (access(pFile->fname, F_OK) == 0) { tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname); @@ -424,33 +424,57 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { } } -void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - uint32_t version = 0; - STsdbFileInfo info = {0}; +int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - int fd = open(fname, O_RDONLY); - if (fd < 0) goto _err; + if (lseek(pFile->fd, 0, SEEK_SET) < 0) { + tsdbError("failed to lseek file %s to start since %s", pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - if (taosTRead(fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err; + if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE, + strerror(errno)); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } - if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) goto _err; + if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { + tsdbError("file %s header part is corrupted with failed checksum", pFile->fname); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } void *pBuf = (void *)buf; - pBuf = taosDecodeFixedU32(pBuf, &version); - pBuf = tsdbDecodeSFileInfo(pBuf, &info); + pBuf = taosDecodeFixedU32(pBuf, version); + pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); + + return 0; +} - off_t offset = lseek(fd, 0, SEEK_END); +void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { + uint32_t version = 0; + SFile file = {0}; + SFile * pFile = &file; + + strncpy(pFile->fname, fname, TSDB_FILENAME_LEN); + pFile->fd = -1; + + if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; + if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err; + + off_t offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) goto _err; - close(fd); + tsdbCloseFile(pFile); - *magic = info.magic; + *magic = pFile->info.magic; *size = offset; return; _err: - if (fd >= 0) close(fd); + tsdbCloseFile(pFile); *magic = TSDB_FILE_INIT_MAGIC; *size = 0; } @@ -458,34 +482,23 @@ _err: // ---------------- LOCAL FUNCTIONS ---------------- static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { uint32_t version; - char buf[512] = "\0"; - tsdbGetDataFileName(pRepo, fid, type, pFile->fname); + tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); pFile->fd = -1; if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; - if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILE_HEAD_SIZE, - pFile->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + if (tsdbLoadFileHeader(pFile, &version) < 0) { + tsdbError("vgId:%d failed to load file %s header part since %s", REPO_ID(pRepo), pFile->fname, tstrerror(terrno)); goto _err; } - if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { - tsdbError("vgId:%d file %s head part is corrupted", REPO_ID(pRepo), pFile->fname); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - goto _err; - } - - void *pBuf = buf; - pBuf = taosDecodeFixedU32(pBuf, &version); - pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); if (pFile->info.size == TSDB_FILE_HEAD_SIZE) { pFile->info.size = lseek(pFile->fd, 0, SEEK_END); } if (version != TSDB_FILE_VERSION) { + // TODO: deal with error tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem", REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index ea912ad1f4..294df10edb 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -354,8 +354,8 @@ char *tsdbGetMetaFileName(char *rootDir) { return fname; } -void tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type, char *fname) { - snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]); +void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) { + snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]); } int tsdbLockRepo(STsdbRepo *pRepo) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 84f22918ec..db24eae148 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -102,7 +102,8 @@ void tsdbResetHelper(SRWHelper *pHelper) { int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ASSERT(pHelper != NULL && pGroup != NULL); - SFile *pFile = NULL; + SFile * pFile = NULL; + STsdbRepo *pRepo = pHelper->pRepo; // Clear the helper object tsdbResetHelper(pHelper); @@ -112,8 +113,10 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { // Set the files pHelper->files.fGroup = *pGroup; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); - tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); + tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, + helperNewHeadF(pHelper)->fname); + tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST, + helperNewLastF(pHelper)->fname); } // Open the files @@ -443,10 +446,64 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { return 0; } +int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) { + const char *prefixMsg = "failed to load SCompIdx part"; + if (lseek(pFile->fd, offset, SEEK_SET) < 0) { + tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, pFile->fname, offset, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (taosTRead(pFile->fd, buffer, len) < len) { + tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len, + strerror(errno)); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)buffer, len)) { + tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, pFile->fname, offset, len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + return 0; +} + +int tsdbDecodeSCompIdxImpl(void *buffer, uint32_t len, SCompIdx **ppCompIdx, int *numOfIdx) { + int nIdx = 0; + void *pPtr = buffer; + + while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) { + size_t tlen = taosTSizeof(*ppCompIdx); + if (tlen < sizeof(SCompIdx) * (nIdx + 1)) { + *ppCompIdx = (SCompIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2); + if (*ppCompIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + pPtr = tsdbDecodeSCompIdx(pPtr, &((*ppCompIdx)[nIdx])); + if (pPtr == NULL) { + tsdbError("failed to decode SCompIdx part, idx:%d", nIdx); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + nIdx++; + + ASSERT(nIdx == 1 || (*ppCompIdx)[nIdx - 1].tid > (*ppCompIdx)[nIdx - 2].tid); + ASSERT(POINTER_DISTANCE(pPtr, buffer) <= (int)(len - sizeof(TSCKSUM))); + } + + *numOfIdx = nIdx; + return 0; +} + int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); SFile *pFile = helperHeadF(pHelper); - int fd = pFile->fd; if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { // If not load from file, just load it in object @@ -456,54 +513,18 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { return -1; } - if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosTRead(fd, (void *)(pHelper->pBuffer), pFile->info.len) < (int)pFile->info.len) { - tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, - pFile->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + // Load SCompIdx binary from file + if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) { return -1; } - if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) { - tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.len); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + // Decode the SCompIdx part + if (tsdbDecodeSCompIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray), + &(pHelper->idxH.numOfIdx)) < 0) { + tsdbError("vgId:%d failed to decode SCompIdx part from file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, + tstrerror(errno)); return -1; } - - // Decode it - pHelper->idxH.numOfIdx = 0; - void *ptr = pHelper->pBuffer; - while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (int)(pFile->info.len - sizeof(TSCKSUM))) { - size_t tlen = taosTSizeof(pHelper->idxH.pIdxArray); - pHelper->idxH.numOfIdx++; - - if (tlen < pHelper->idxH.numOfIdx * sizeof(SCompIdx)) { - pHelper->idxH.pIdxArray = (SCompIdx *)taosTRealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2); - if (pHelper->idxH.pIdxArray == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - - ptr = tsdbDecodeSCompIdx(ptr, &(pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1])); - if (ptr == NULL) { - tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname, - pFile->info.len); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - ASSERT(pHelper->idxH.numOfIdx == 1 || pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1].tid > - pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 2].tid); - - ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= (int)(pFile->info.len - sizeof(TSCKSUM))); - } } } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); @@ -515,36 +536,49 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { return 0; } +int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) { + const char *prefixMsg = "failed to load SCompInfo/SCompBlock part"; + + if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) { + tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + *ppCompInfo = taosTRealloc((void *)(*ppCompInfo), pIdx->len); + if (*ppCompInfo == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (taosTRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) { + tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)(*ppCompInfo), pIdx->len)) { + tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, pFile->fname, pIdx->offset, pIdx->len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + return 0; +} + int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); SCompIdx *pIdx = &(pHelper->curCompIdx); - int fd = helperHeadF(pHelper)->fd; + SFile *pFile = helperHeadF(pHelper); if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { ASSERT(pIdx->uid == pHelper->tableInfo.uid); - if (lseek(fd, pIdx->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pHelper->pCompInfo = taosTRealloc((void *)pHelper->pCompInfo, pIdx->len); - if (taosTRead(fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) { - tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - helperHeadF(pHelper)->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) { - tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo), - helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } + if (tsdbLoadCompInfoImpl(pFile, pIdx, &(pHelper->pCompInfo)) < 0) return -1; ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid); } -- GitLab