From 1adfaef80da7da5143c5d796b464ee066b881752 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Oct 2020 08:05:40 +0000 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 101 +------------------------- src/tsdb/src/tsdbReadUtil.c | 140 +++++++++++++++++++++--------------- src/tsdb/src/tsdbUtil.c | 24 ++++++- 3 files changed, 106 insertions(+), 159 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7d3f1c530d..da58b9e83d 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -272,48 +272,6 @@ typedef struct { SBlockCol cols[]; } SBlockData; -// typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; - -// typedef struct { -// TSKEY minKey; -// TSKEY maxKey; -// SFileGroup fGroup; -// SFile nHeadF; -// SFile nLastF; -// } SHelperFile; - -// typedef struct { -// uint64_t uid; -// int32_t tid; -// } SHelperTable; - -// typedef struct { -// SBlockIdx* pIdxArray; -// int numOfIdx; -// int curIdx; -// } SIdxH; - -// typedef struct { -// tsdb_rw_helper_t type; - -// STsdbRepo* pRepo; -// int8_t state; -// // For file set usage -// SHelperFile files; -// SIdxH idxH; -// SBlockIdx curCompIdx; -// void* pWIdx; -// // For table set usage -// SHelperTable tableInfo; -// SBlockInfo* pCompInfo; -// bool hasOldLastBlock; -// // For block set usage -// SBlockData* pCompData; -// SDataCols* pDataCols[2]; -// void* pBuffer; // Buffer to hold the whole data block -// void* compBuffer; // Buffer for temperary compress/decompress purpose -// } SRWHelper; - // ------------------ tsdbScan.c typedef struct { SFileGroup fGroup; @@ -342,7 +300,6 @@ typedef struct { void* pCBuf; } SReadHandle; -#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)])) #define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) // Operations @@ -506,62 +463,6 @@ int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); -// ------------------ tsdbRWHelper.c -// #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state -// #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set -// #define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded -// #define TSDB_HELPER_TABLE_SET 0x4 // Table is set -// #define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded -// #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded -// #define helperSetState(h, s) (((h)->state) |= (s)) -// #define helperClearState(h, s) ((h)->state &= (~(s))) -// #define helperHasState(h, s) ((((h)->state) & (s)) == (s)) -// #define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) -// #define TSDB_MAX_SUBBLOCKS 8 -// #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) -// #define helperType(h) (h)->type -// #define helperRepo(h) (h)->pRepo -// #define helperState(h) (h)->state -// #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) -// #define helperFileId(h) ((h)->files.fGroup.fileId) -// #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) -// #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) -// #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) -// #define helperNewHeadF(h) (&((h)->files.nHeadF)) -// #define helperNewLastF(h) (&((h)->files.nLastF)) - -// int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -// int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -// void tsdbDestroyHelper(SRWHelper* pHelper); -// void tsdbResetHelper(SRWHelper* pHelper); -// int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); -// int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup); -// int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); -// int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); -// int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); -// int tsdbWriteCompInfo(SRWHelper* pHelper); -// int tsdbWriteCompIdx(SRWHelper* pHelper); -// int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); -// int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx); -// int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); -// int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo); -// int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); -// int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target); -// void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -// int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds, -// int numOfColIds); -// int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo); - -// static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { -// if (*(TSKEY*)key1 > *(TSKEY*)key2) { -// return 1; -// } else if (*(TSKEY*)key1 == *(TSKEY*)key2) { -// return 0; -// } else { -// return -1; -// } -// } - // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId #define IS_REPO_LOCKED(r) (r)->repoLocked @@ -609,7 +510,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); #define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)])) -int tsdbAllocBuf(void** ppBuf, int size); +int tsdbAllocBuf(void **ppBuf, uint32_t size); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index e8f88ff0be..0f2c20fcbf 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include #include #include #include @@ -23,6 +24,15 @@ #define TSDB_KEY_COL_OFFSET 0 +static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols); +static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds); +static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH); +static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bsize); +static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); + SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH)); if (pReadH == NULL) { @@ -35,6 +45,7 @@ SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbCfg * pCfg = &(pRepo->config); + // TODO: make the memory allocation on demand if ((pReadH->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL || (pReadH->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -61,6 +72,8 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) { } int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { + ASSERT(pReadH != NULL && pFGroup != NULL); + STsdbRepo *pRepo = pReadH->pRepo; STsdbCfg * pCfg = &(pRepo->config); @@ -69,7 +82,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { tsdbResetFGroupFd(&(pReadH->fGroup)); for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = TSDB_READ_FILE(pReadH, type); + SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type); if (pFile->fname[0] != '\0') { // pFile->fname[0] == '\0' is for commit usage pFile->fd = open(pFile->fname, O_RDONLY); @@ -89,7 +102,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = TSDB_READ_FILE(pReadH, type); + SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type); if (pFile->fd >= 0) { (void)close(pFile->fd); @@ -99,15 +112,19 @@ void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) { } int tsdbLoadBlockIdx(SReadHandle *pReadH) { + ASSERT(pReadH != NULL); + STsdbRepo *pRepo = pReadH->pRepo; - SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); + SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD); - if (pFile->fd < 0 || pFile->info.len == 0) { + if (pFile->fd < 0 || pFile->info.len == 0) { // for backward compatibility pReadH->nBlockIdx = 0; pReadH->pCurBlockIdx = NULL; return 0; } + ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len); + if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -129,15 +146,15 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { } if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), pFile->info.len)) { - tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname, - pFile->info.offset, pFile->info.len); + tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u file size %" PRIu64, REPO_ID(pRepo), + pFile->fname, pFile->info.offset, pFile->info.len, pFile->info.size); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } if (tsdbDecodeBlockIdxArray(pReadH) < 0) { - tsdbError("vgId:%d error occurs while decoding block idx part from file %s", REPO_ID(pRepo), pFile->fname); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d error occurs while decoding block idx part from file %s since %s", REPO_ID(pRepo), pFile->fname, + tstrerror(terrno)); return -1; } @@ -147,7 +164,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { } int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { - ASSERT(pTable != NULL); + ASSERT(pReadH != NULL && pTable != NULL); pReadH->pTable = pTable; @@ -162,6 +179,7 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { if (pReadH->nBlockIdx > 0) { ASSERT(pReadH->cBlockIdx <= pReadH->nBlockIdx); + // linear search TABLE_TID(pTable) while (true) { if (pReadH->cBlockIdx >= pReadH->nBlockIdx) { pReadH->pCurBlockIdx = NULL; @@ -192,15 +210,17 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { } int tsdbLoadBlockInfo(SReadHandle *pReadH) { + ASSERT(pReadH != NULL); + if (pReadH->pCurBlockIdx == NULL) return 0; - SFile * pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); + SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD); SBlockIdx *pBlockIdx = pReadH->pCurBlockIdx; STsdbRepo *pRepo = pReadH->pRepo; - ASSERT(pFile->fd > 0); + ASSERT(pFile->fd > 0 && pBlockIdx->len > 0); - if (tsdbAllocBuf(&((void *)pReadH->pBlockInfo), pBlockIdx->len) < 0) { + if (tsdbAllocBuf(&((void *)(pReadH->pBlockInfo)), pBlockIdx->len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -221,7 +241,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { } if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) { - tsdbError("vgId:%d table %s block info part is corrupted from file %s", REPO_ID(pRepo), + tsdbError("vgId:%d table %s block info part is corrupted in file %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pReadH->pTable), pFile->fname); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; @@ -270,7 +290,7 @@ int tsdbLoadBlockDataCols(SReadHandle *pReadH, SBlock *pBlock, SBlockInfo *pBloc for (int i = 1; i < nSubBlock; i++) { pSubBlock++; if (tsdbLoadBlockDataColsImpl(pReadH, pSubBlock, pReadH->pDataCols[1], colIds, numOfCols) < 0) return -1; - if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) goto _err; + if (tdMergeDataCols(pReadH->pDataCols[0], pReadH->pDataCols[1], pReadH->pDataCols[1]->numOfRows) < 0) return -1; } ASSERT(pReadH->pDataCols[0]->numOfRows == pBlock->numOfRows); @@ -284,8 +304,12 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { ASSERT(pBlock->numOfSubBlocks <= 1); STsdbRepo *pRepo = pReadH->pRepo; - SFile * pFile = - (pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); + SFile * pFile = NULL; + if (pBlock->last) { + pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); + } else { + pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); + } if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, @@ -316,6 +340,9 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { } ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols); + ASSERT(pReadH->pBlockData->delimiter == TSDB_FILE_DELIMITER); + ASSERT(pReadH->pBlockData->numOfCols == pBlock->numOfCols); + return 0; } @@ -323,8 +350,12 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ASSERT(pBlock->numOfSubBlocks <= 1); STsdbRepo *pRepo = pReadH->pRepo; - SFile *pFile = - (pBlock->last) ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); + SFile * pFile = NULL; + if (pBlock->last) { + pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); + } else { + pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); + } if (tsdbAllocBuf(&(pReadH->pBuf), pBlock->len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -347,8 +378,9 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols } int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols); - if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)pReadH->pBuf, tsize)) { - tsdbError("vgId:%d block data part from file %s is corrupted", REPO_ID(pRepo), pFile->fname); + if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), tsize)) { + tsdbError("vgId:%d block data part from file %s at offset %" PRId64 " len %d is corrupted", REPO_ID(pRepo), + pFile->fname, pBlock->offset, pBlock->len); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -362,8 +394,8 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols ASSERT(pBlock->numOfRows <= pDataCols->maxPoints); pDataCols->numOfRows = pBlock->numOfRows; - int ccol = 0; - int dcol = 0; + int ccol = 0; // loop iter over SBlockCols + int dcol = 0; // loop iter over pDataCols while (dcol < pDataCols->numOfCols) { SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (dcol != 0 && ccol >= pBlockData->numOfCols) { @@ -422,24 +454,32 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC int numOfColIds) { ASSERT(pBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); + ASSERT(pBlock->numOfRows <= pDataCols->maxPoints); - SFile *pFile = - pBlock->last ? TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_LAST) : TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_DATA); + SFile *pFile = NULL; + if (pBlock->last) { + pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); + } else { + pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); + } SBlockCol blockCol = {0}; - // If only load timestamp column, no need to load SBlockData part - if (numOfColIds > 1 && tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1; + // if only load the key timestamp column, no need to load SBlockData part + if (numOfColIds > 1) { + if (tsdbLoadBlockDataInfo(pReadH, pBlock) < 0) return -1; + } tdResetDataCols(pDataCols); pDataCols->numOfRows = pBlock->numOfRows; - int dcol = 0; - int ccol = 0; + int dcol = 0; // loop iter over pDataCols + int ccol = 0; // loop iter over SBlockCol for (int i = 0; i < numOfColIds; i++) { int16_t colId = colIds[i]; SDataCol * pDataCol = NULL; SBlockCol *pBlockCol = NULL; + // linear search over pDataCols of colId while (true) { if (dcol >= pDataCols->numOfCols) { pDataCol = NULL; @@ -458,6 +498,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC if (pDataCol == NULL) continue; ASSERT(pDataCol->colId == colId); + // linear search over SBlockCols if (colId == 0) { // load the key row blockCol.colId = colId; blockCol.len = pBlock->keyLen; @@ -497,7 +538,7 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { void * pBuf = pReadH->pBuf; - SFile *pFile = TSDB_READ_FILE(pReadH, TSDB_FILE_TYPE_HEAD); + SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD); pReadH->nBlockIdx = 0; while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) { @@ -508,7 +549,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx])); if (pBuf == NULL) { - tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname); + tsdbError("vgId:%d failed to decode block idx part from file %s at idx %d", REPO_ID(pRepo), pFile->fname, + pReadH->nBlockIdx); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -516,21 +558,9 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { pReadH->nBlockIdx++; ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid)); } - return 0; -} - -int tsdbAllocBuf(void **ppBuf, int size) { - void *pBuf = *pBuf; - int tsize = taosTSizeof(pBuf); - if (tsize == 0) tsize = 1024; - - while (tsize < size) { - tsize *= 2; - } - - *ppBuf = taosTRealloc(pBuf, tsize); - if (*ppBuf == NULL) return -1; + ASSERT(pReadH->nBlockIdx > 0); + return 0; } static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx) { @@ -561,21 +591,18 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return -1; } pDataCol->len = tlen; - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - dataColSetOffset(pDataCol, numOfRows); - } else { - ASSERT(pDataCol->len == pDataCol->bytes * numOfRows); - } } else { // No need to decompress, just memcpy it pDataCol->len = len - sizeof(TSCKSUM); memcpy(pDataCol->pData, content, pDataCol->len); - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - dataColSetOffset(pDataCol, numOfRows); - } else { - ASSERT(pDataCol->len == pDataCol->bytes * numOfRows); - } } + + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + dataColSetOffset(pDataCol, numOfRows); + } else { + ASSERT(pDataCol->len == pDataCol->bytes * numOfRows); + } + return 0; } @@ -591,9 +618,10 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB return -1; } - int64_t offset = pBlock->offset + TSDB_GET_COMPCOL_LEN(pBlock->numOfCols) + pBlockCol->offset; + int64_t offset = pBlock->offset + TSDB_BLOCK_DATA_LEN(pBlock->numOfCols) + pBlockCol->offset; if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, offset, + strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c index 18a23284df..d2a1bd53aa 100644 --- a/src/tsdb/src/tsdbUtil.c +++ b/src/tsdb/src/tsdbUtil.c @@ -100,14 +100,32 @@ void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) { pBlockIdx->hasLast = hasLast; pBlockIdx->numOfBlocks = numOfBlocks; - pBlockIdx->uid = value; + pBlockIdx->uid = uid; pBlockIdx->maxKey = (TSKEY)maxKey; return buf; } +// TODO: make it static FORCE_INLINE void tsdbResetFGroupFd(SFileGroup *pFGroup) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - pFGroup->files[type].fd = -1; + TSDB_FILE_IN_FGROUP(pFGroup, type)->fd = -1; } -} \ No newline at end of file +} + +int tsdbAllocBuf(void **ppBuf, uint32_t size) { + ASSERT(size > 0); + + void *pBuf = *pBuf; + + uint32_t tsize = taosTSizeof(pBuf); + if (tsize >= size) return 0; + + if (tsize == 0) tsize = 1024; + while (tsize < size) { + tsize *= 2; + } + + *ppBuf = taosTRealloc(pBuf, tsize); + if (*ppBuf == NULL) return -1; +} -- GitLab