From 5c796c8bda9fafcf28f7929d93a7293c5bd9f02e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 9 Jan 2021 21:36:24 +0800 Subject: [PATCH] refact --- src/tsdb/inc/tsdbFile.h | 41 ++++------ src/tsdb/inc/tsdbReadImpl.h | 22 +++--- src/tsdb/inc/tsdbint.h | 4 + src/tsdb/src/tsdbFile.c | 1 + src/tsdb/src/tsdbReadImpl.c | 149 +++++++++++++++++++----------------- 5 files changed, 112 insertions(+), 105 deletions(-) diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 57c365c48d..aa6a5629fc 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -40,15 +40,6 @@ typedef enum { TSDB_FILE_MANIFEST } TSDB_FILE_T; -#define tsdbOpenFile(T, f, flags) tsdbOpen##T(f, flags) -#define tsdbCloseFile(T, f) tsdbClose##T(f) -#define tsdbSeekFile(T, f, offset, whence) tsdbSeek##T(f, offset, whence) -#define tsdbWriteFile(T, f, buf, nbytes) tsdbWrite##T(f, buf, nbytes) -#define tsdbUpdateFileMagic(T, f, pCksum) tsdbUpdate##T##Magic(f, pCksum) -#define tsdbTellFile(T, f) tsdbTell##T(f) -#define tsdbEncodeFile(T, buf, f) tsdbEncode##T(buf, f) -#define tsdbDecodeFile(T, buf, f) tsdbDecode##T(buf, f) - // =============== SMFile typedef struct { int64_t size; @@ -68,7 +59,7 @@ void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); -static FORCE_INLINE int tsdbOpenSMFile(SMFile* pMFile, int flags) { +static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pMFile)); pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags); @@ -80,14 +71,14 @@ static FORCE_INLINE int tsdbOpenSMFile(SMFile* pMFile, int flags) { return 0; } -static FORCE_INLINE void tsdbCloseSMFile(SMFile* pMFile) { +static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) { if (TSDB_FILE_OPENED(pMFile)) { close(pMFile->fd); TSDB_FILE_SET_CLOSED(pMFile); } } -static FORCE_INLINE int64_t tsdbSeekSMFile(SMFile* pMFile, int64_t offset, int whence) { +static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) { ASSERT(TSDB_FILE_OPENED(pMFile)); int64_t loffset = taosLSeek(TSDB_FILE_FD(pMFile), offset, whence); @@ -99,7 +90,7 @@ static FORCE_INLINE int64_t tsdbSeekSMFile(SMFile* pMFile, int64_t offset, int w return loffset; } -static FORCE_INLINE int64_t tsdbWriteSMFile(SMFile* pMFile, void* buf, int64_t nbyte) { +static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pMFile)); int64_t nwrite = taosWrite(pMFile->fd, buf, nbyte); @@ -111,11 +102,11 @@ static FORCE_INLINE int64_t tsdbWriteSMFile(SMFile* pMFile, void* buf, int64_t n return nwrite; } -static FORCE_INLINE void tsdbUpdateSMFileMagic(SMFile* pMFile, void* pCksum) { +static FORCE_INLINE void tsdbUpdateMFileMagic(SMFile* pMFile, void* pCksum) { pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t*)(pCksum), sizeof(TSCKSUM)); } -static FORCE_INLINE int64_t tsdbTellSMFile(SMFile* pMFile) { return tsdbSeekSMFile(pMFile, 0, SEEK_CUR); } +static FORCE_INLINE int64_t tsdbTellMFile(SMFile* pMFile) { return tsdbSeekMFile(pMFile, 0, SEEK_CUR); } // =============== SDFile typedef struct { @@ -140,7 +131,7 @@ void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); -static FORCE_INLINE int tsdbOpenSDFile(SDFile *pDFile, int flags) { +static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pDFile)); pDFile->fd = open(pDFile->f.aname, flags); @@ -152,14 +143,14 @@ static FORCE_INLINE int tsdbOpenSDFile(SDFile *pDFile, int flags) { return 0; } -static FORCE_INLINE void tsdbCloseSDFile(SDFile* pDFile) { +static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) { if (TSDB_FILE_OPENED(pDFile)) { close(pDFile->fd); TSDB_FILE_SET_CLOSED(pDFile); } } -static FORCE_INLINE int64_t tsdbSeekSDFile(SDFile *pDFile, int64_t offset, int whence) { +static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t loffset = taosLSeek(pDFile->fd, offset, whence); @@ -171,7 +162,7 @@ static FORCE_INLINE int64_t tsdbSeekSDFile(SDFile *pDFile, int64_t offset, int w return loffset; } -static FORCE_INLINE int64_t tsdbWriteSDFile(SDFile* pDFile, void* buf, int64_t nbyte) { +static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t nwrite = taosWrite(pDFile->fd, buf, nbyte); @@ -183,20 +174,20 @@ static FORCE_INLINE int64_t tsdbWriteSDFile(SDFile* pDFile, void* buf, int64_t n return nwrite; } -static FORCE_INLINE int64_t tsdbAppendSDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) { +static FORCE_INLINE int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t nwrite; - *offset = tsdbSeekSDFile(pDFile, 0, SEEK_SET); + *offset = tsdbSeekDFile(pDFile, 0, SEEK_SET); if (*offset < 0) return -1; - nwrite = tsdbWriteSDFile(pDFile, buf, nbyte); + nwrite = tsdbWriteDFile(pDFile, buf, nbyte); if (nwrite < 0) return nwrite; return nwrite; } -static FORCE_INLINE int64_t tsdbReadSDFile(SDFile* pDFile, void* buf, int64_t nbyte) { +static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t nread = taosRead(pDFile->fd, buf, nbyte); @@ -208,9 +199,9 @@ static FORCE_INLINE int64_t tsdbReadSDFile(SDFile* pDFile, void* buf, int64_t nb return nread; } -static FORCE_INLINE int64_t tsdbTellSDFile(SDFile *pDFile) { return tsdbSeekSDFile(pDFile, 0, SEEK_CUR); } +static FORCE_INLINE int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); } -static FORCE_INLINE void tsdbUpdateSDFileMagic(SDFile* pDFile, void* pCksm) { +static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) { pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM)); } diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index df113f8b93..99dcef3ffc 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -41,18 +41,18 @@ typedef struct { int32_t algorithm : 8; int32_t numOfRows : 24; int32_t len; - int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols + int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols int16_t numOfSubBlocks; - int16_t numOfCols; // not including timestamp column + int16_t numOfCols; // not including timestamp column TSKEY keyFirst; TSKEY keyLast; } SBlock; typedef struct { - int32_t delimiter; // For recovery usage - int32_t tid; - uint64_t uid; - SBlock blocks[]; + int32_t delimiter; // For recovery usage + int32_t tid; + uint64_t uid; + SBlock blocks[]; } SBlockInfo; typedef struct { @@ -70,9 +70,9 @@ typedef struct { } SBlockCol; typedef struct { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - uint64_t uid; // For recovery usage + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + uint64_t uid; // For recovery usage SBlockCol cols[]; } SBlockData; @@ -92,7 +92,7 @@ struct SReadH { #define TSDB_READ_REPO(rh) ((rh)->pRepo) #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) -#define TSDB_READ_FSET(rh) &((rh)->rSet) +#define TSDB_READ_FSET(rh) (&((rh)->rSet)) #define TSDB_READ_TABLE(ch) ((rh)->pTable) #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) @@ -111,7 +111,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo); int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds, - const int numOfColsIds); + int numOfColsIds); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 893169dff0..23851b3521 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -18,7 +18,10 @@ // TODO: remove the include #include +#include #include +#include +#include #include "os.h" #include "tlog.h" @@ -27,6 +30,7 @@ #include "tchecksum.h" #include "tskiplist.h" #include "tdataformat.h" +#include "tscompression.h" #include "tlockfree.h" #include "tlist.h" #include "hash.h" diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index d18f720d3b..43387144d4 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -174,6 +174,7 @@ int tsdbOpenDFileSet(SDFileSet *pSet, int flags) { void tsdbCloseDFileSet(SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); tsdbCloseDFile(pDFile); } } diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 85352b1dcc..9948a9f3c5 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ -#include "tchecksum.h" -#include "tsdbMain.h" +#include "tsdbint.h" #define TSDB_KEY_COL_OFFSET 0 -static void tsdbResetReadH(SReadH *pReadh); +static void tsdbResetReadTable(SReadH *pReadh); +static void tsdbResetReadFile(SReadH *pReadh); static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize); @@ -27,31 +27,31 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDat static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { - ASSERT(pReadh != NULL); + ASSERT(pReadh != NULL && pRepo != NULL); + STsdbCfg *pCfg = REPO_CFG(pRepo); + + memset((void *)pReadh, 0, sizeof(*pReadh)); pReadh->pRepo = pRepo; - pReadh->aBlkIdx = taosArrayInit(sizeof(SBlockIdx), 1024); - if (pReadh->aBlkIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; + for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { + TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh), ftype)); } - pReadh->pDCols[0] = tdNewDataCols(); - if (pReadh->pDCols[0] == NULL) { + pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (pReadh->aBlkIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyReadH(pReadh); return -1; } - pReadh->pDCols[0] = tdNewDataCols(); + pReadh->pDCols[0] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); if (pReadh->pDCols[0] == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyReadH(pReadh); return -1; } - pReadh->pDCols[1] = tdNewDataCols(); + pReadh->pDCols[1] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); if (pReadh->pDCols[1] == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyReadH(pReadh); @@ -69,7 +69,7 @@ void tsdbDestroyReadH(SReadH *pReadh) { pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]); pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]); pReadh->pBlkData = taosTZfree(pReadh->pBlkData); - pReadh->pBlkInfo = tdFreeDataCols(pReadh->pBlkInfo); + pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo); pReadh->cidx = 0; pReadh->pBlkIdx = NULL; pReadh->pTable = NULL; @@ -79,20 +79,22 @@ void tsdbDestroyReadH(SReadH *pReadh) { } int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) { - tsdbResetReadH(pReadh); + ASSERT(pSet != NULL); + tsdbResetReadFile(pReadh); pReadh->rSet = *pSet; + for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { + TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh), ftype)); + } if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) return -1; return 0; } -void tsdbCloseAndUnsetFSet(SReadH *pReadh) { - // TODO -} +void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); } int tsdbLoadBlockIdx(SReadH *pReadh) { - SDFile * pHeadf = TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh)); + SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh); SBlockIdx blkIdx; ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0); @@ -144,7 +146,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) { tsize++; ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid < - ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid) + ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid); } return 0; @@ -153,6 +155,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh) { int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + pReadh->pTable = pTable; + if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -163,33 +167,32 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { return -1; } - size_t size = taosArrayGetSize(pReadh->aBlkIdx); if (size > 0) { while (true) { if (pReadh->cidx >= size) { - pReadh->pBlockIdx = NULL; + pReadh->pBlkIdx = NULL; break; } SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx); if (pBlkIdx->tid == TABLE_TID(pTable)) { if (pBlkIdx->uid == TABLE_UID(pTable)) { - pReadh->pBlockIdx = pBlkIdx; + pReadh->pBlkIdx = pBlkIdx; } else { - pReadh->pBlockIdx = NULL; + pReadh->pBlkIdx = NULL; } pReadh->cidx++; break; } else if (pBlkIdx->tid > TABLE_TID(pTable)) { - pReadh->pBlockIdx = NULL; + pReadh->pBlkIdx = NULL; break; } else { pReadh->cidx++; } } } else { - pReadh->pBlockIdx = NULL; + pReadh->pBlkIdx = NULL; } return 0; @@ -202,8 +205,8 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { SBlockIdx *pBlkIdx = pReadh->pBlkIdx; if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s to offset %u since %s", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, tstrerror(terrno)); + tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len); return -1; } @@ -218,7 +221,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { if (nread < pBlkIdx->len) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes: %" PRId64, + tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%" PRId64, TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread); return -1; } @@ -231,28 +234,26 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { } if (pTarget) { - memcpy(pTarget, (void *)pReadh->pBlkInfo, pBlkIdx->len); + memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len); } return 0; } -int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo) { +int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo) { ASSERT(pBlock->numOfSubBlocks > 0); const SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { - if (pBlockInfo) { - iBlock = POINTER_SHIFT(pBlockInfo, pBlock->offset); + if (pBlkInfo) { + iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset); } else { - iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset); + iBlock = (SBlock *)POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset); } } - tdResetDataCols(pReadh->pDCols[0]); if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0]) < 0) return -1; for (int i = 1; i < pBlock->numOfSubBlocks; i++) { - tdResetDataCols(pReadh->pDCols[1]); iBlock++; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; @@ -265,23 +266,21 @@ int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pB return 0; } -int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds, - const int numOfColsIds) { +int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo, const int16_t *colIds, + int numOfColsIds) { ASSERT(pBlock->numOfSubBlocks > 0); const SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { - if (pBlockInfo) { - iBlock = POINTER_SHIFT(pBlockInfo, pBlock->offset); + if (pBlkInfo) { + iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset); } else { iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset); } } - tdResetDataCols(pReadh->pDCols[0]); if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds) < 0) return -1; for (int i = 1; i < pBlock->numOfSubBlocks; i++) { - tdResetDataCols(pReadh->pDCols[1]); iBlock++; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; @@ -297,21 +296,21 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { ASSERT(pBlock->numOfSubBlocks <= 1); - SDFile *pFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); + SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); - if (tsdbSeekDFile(pFile, pBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %u since %s", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlock->offset, tstrerror(terrno)); + if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno)); return -1; } size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); - if (tsdbMakeRoom((void **)(&(pReadh->pBlkData), size)) < 0) return -1; + if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1; - int64_t nread = tsdbReadDFile(pFile, (void *)(pReadh->pBlkData), size); + int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size); if (nread < 0) { tsdbError("vgId:%d failed to load block statis part while read file %s sinces %s, offset:%" PRId64 " len :%" PRIzu, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), tstrerror(terrno), pBlock->offset, size); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, size); return -1; } @@ -319,14 +318,14 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu " read bytes: %" PRId64, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), pBlock->offset, size, nread); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size, nread); return -1; } if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), size)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), pBlock->offset, size); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size); return -1; } @@ -395,12 +394,16 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) { } } -static void tsdbResetReadH(SReadH *pReadh) { +static void tsdbResetReadTable(SReadH *pReadh) { tdResetDataCols(pReadh->pDCols[0]); tdResetDataCols(pReadh->pDCols[1]); pReadh->cidx = 0; pReadh->pBlkIdx = NULL; pReadh->pTable = NULL; +} + +static void tsdbResetReadFile(SReadH *pReadh) { + tsdbResetReadTable(pReadh); taosArrayClear(pReadh->aBlkIdx); tsdbCloseDFileSet(TSDB_READ_FSET(pReadh)); } @@ -410,17 +413,18 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_HEAD_FILE(pReadh); - if (tsdbMakeRoom((void **)(&(pReadh->pBuf)), pBlock->len) < 0) return -1; + tdResetDataCols(pDataCols); + if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1; - SBlockData *pBlockData = (SBlockData *)(pReadh->pBuf); + SBlockData *pBlockData = (SBlockData *)TSDB_READ_BUF(pReadh); if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to load block data part while seek file %s to offset %u since %s", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlock->offset, tstrerror(terrno)); + tsdbError("vgId:%d failed to load block data part while seek file %s to offset %" PRId64 " since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno)); return -1; } - int64_t nread = tsdbReadDFile(pDFile, pReadh->pBuf, pBlock->len); + int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len); if (nread < 0) { tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, pBlock->len); @@ -429,20 +433,21 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols if (nread < pBlock->len) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu - " read bytes: %" PRId64, + tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64 + " expected bytes:%d read bytes: %" PRId64, TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, pBlock->len, nread); return -1; } int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); - if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBuf), tsize)) { + if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tsize); return -1; } + ASSERT(tsize < pBlock->len); ASSERT(pBlockData->numOfCols == pBlock->numOfCols); pDataCols->numOfRows = pBlock->numOfRows; @@ -475,21 +480,24 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols if (tcolId == pDataCol->colId) { if (pBlock->algorithm == TWO_STAGE_COMP) { int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { zsize += (sizeof(VarDataLenT) * pBlock->numOfRows); } - if (tsdbMakeRoom((void **)(&(pReadh->pCBuf)), zsize) < 0) return -1; + if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1; } if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm, - pBlock->numOfRows, pDataCols->maxPoints, pReadh->pCBuf, - (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { + pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh), + (int32_t)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) { tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset); return -1; } - if (dcol != 0) ccol++; + + if (dcol != 0) { + ccol++; + } dcol++; } else if (tcolId < pDataCol->colId) { ccol++; @@ -528,19 +536,22 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 memcpy(pDataCol->pData, content, pDataCol->len); } - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { dataColSetOffset(pDataCol, numOfRows); } return 0; } -static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds) { ASSERT(pBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); SBlockCol blockCol = {0}; + tdResetDataCols(pDataCols); + // If only load timestamp column, no need to load SBlockData part if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1; @@ -584,7 +595,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SData break; } - pBlockCol = &(pReadh->pBlockData->cols[ccol]); + pBlockCol = &(pReadh->pBlkData->cols[ccol]); if (pBlockCol->colId > colId) { pBlockCol = NULL; break; @@ -642,7 +653,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pFile), + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pDFile), pBlockCol->colId, offset); return -1; } -- GitLab