diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 1f8e0f93474ada8e4d2e834872e1c04d57294931..2231008c4620ea12705c2b07521596d72bb6c117 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -257,7 +257,7 @@ typedef struct { uint32_t numOfBlocks : 30; uint64_t uid; TSKEY maxKey; -} SCompIdx; +} SBlockIdx; typedef struct { int64_t last : 1; @@ -265,19 +265,19 @@ typedef struct { int32_t algorithm : 8; int32_t numOfRows : 24; int32_t len; - int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols + int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols int16_t numOfSubBlocks; int16_t numOfCols; // not including timestamp column TSKEY keyFirst; TSKEY keyLast; -} SCompBlock; +} SBlock; typedef struct { int32_t delimiter; // For recovery usage int32_t tid; uint64_t uid; - SCompBlock blocks[]; -} SCompInfo; + SBlock blocks[]; +} SBlockInfo; typedef struct { int16_t colId; @@ -291,14 +291,14 @@ typedef struct { int16_t minIndex; int16_t numOfNull; char padding[2]; -} SCompCol; +} SBlockCol; typedef struct { int32_t delimiter; // For recovery usage int32_t numOfCols; // For recovery usage uint64_t uid; // For recovery usage - SCompCol cols[]; -} SCompData; + SBlockCol cols[]; +} SBlockData; typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; @@ -316,7 +316,7 @@ typedef struct { } SHelperTable; typedef struct { - SCompIdx* pIdxArray; + SBlockIdx* pIdxArray; int numOfIdx; int curIdx; } SIdxH; @@ -329,14 +329,14 @@ typedef struct { // For file set usage SHelperFile files; SIdxH idxH; - SCompIdx curCompIdx; + SBlockIdx curCompIdx; void* pWIdx; // For table set usage SHelperTable tableInfo; - SCompInfo* pCompInfo; + SBlockInfo* pCompInfo; bool hasOldLastBlock; // For block set usage - SCompData* pCompData; + SBlockData* pCompData; SDataCols* pDataCols[2]; void* pBuffer; // Buffer to hold the whole data block void* compBuffer; // Buffer for temperary compress/decompress purpose @@ -355,8 +355,8 @@ typedef struct { typedef struct { SFileGroup fGroup; int numOfIdx; - SCompIdx* pCompIdx; - SCompInfo* pCompInfo; + SBlockIdx* pCompIdx; + SBlockInfo* pCompInfo; void* pBuf; FILE* tLogStream; } STsdbScanHandle; @@ -535,10 +535,10 @@ int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set -#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded +#define TSDB_HELPER_IDX_LOAD 0x2 // SBlockIdx part is loaded #define TSDB_HELPER_TABLE_SET 0x4 // Table is set -#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded -#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded +#define TSDB_HELPER_INFO_LOAD 0x8 // SBlockInfo part is loaded +#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SBlockData part is loaded #define helperSetState(h, s) (((h)->state) |= (s)) #define helperClearState(h, s) ((h)->state &= (~(s))) #define helperHasState(h, s) ((((h)->state) & (s)) == (s)) @@ -568,15 +568,15 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbWriteCompInfo(SRWHelper* pHelper); int tsdbWriteCompIdx(SRWHelper* pHelper); int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); -int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SCompIdx** ppCompIdx, int* numOfIdx); +int tsdbDecodeSBlockIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx); int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); -int tsdbLoadCompInfoImpl(SFile* pFile, SCompIdx* pIdx, SCompInfo** ppCompInfo); +int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo); int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); -int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); +int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target); void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds, +int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds, int numOfColIds); -int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo); +int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo); static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { if (*(TSKEY*)key1 > *(TSKEY*)key2) { @@ -608,8 +608,8 @@ int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int STsdbScanHandle* tsdbNewScanHandle(); void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream); int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid); -int tsdbScanSCompIdx(STsdbScanHandle* pScanHandle); -int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx); +int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle); +int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx); int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index e361c36ab87224668855d55c527150aadd6e3006..1aa7538ed15d81a5b2dd7056b29d4f73bca31a56 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -207,7 +207,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { newLast = TSDB_NLAST_FILE_OPENED(pHelper); if (tsdbLoadCompIdx(pHelper, NULL) < 0) { - tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d failed to load SBlockIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } @@ -243,7 +243,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { goto _err; } - // Write the SCompBlock part + // Write the SBlock part if (tsdbWriteCompInfo(pHelper) < 0) { tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 81b004ce3c3ece66437717acdadbfff1e5bec4ac..43ceada8cf2e446b9e55d4faf062879c5e8267a3 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -715,7 +715,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; - SCompIdx *pIdx = &(rhelper.curCompIdx); + SBlockIdx *pIdx = &(rhelper.curCompIdx); if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index d9d58fccf316acf0f4a90ace3b246fe3838decb3..fb2fc36efd5353b24fb0b67c2677caeda940b81e 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -22,19 +22,19 @@ #include "tscompression.h" #include "tsdbMain.h" -#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) +#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) #define TSDB_KEY_COL_OFFSET 0 -#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock)) +#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SBlock)) #define TSDB_IS_LAST_BLOCK(pb) ((pb)->last) static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock, bool isLast, bool isSuperBlock); static int compareKeyBlock(const void *arg1, const void *arg2); static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo); -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx); +static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo); +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx); static void tsdbResetHelperFileImpl(SRWHelper *pHelper); static int tsdbInitHelperFile(SRWHelper *pHelper); static void tsdbDestroyHelperFile(SRWHelper *pHelper); @@ -48,21 +48,21 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper); static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize); -static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, +static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds); -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); -static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); -static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols); +static int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); +static void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey); static void tsdbDestroyHelperBlock(SRWHelper *pHelper); -static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, +static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, SDataCol *pDataCol); -static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock); +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock); static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, int *blkIdx); static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); -static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SCompBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps); +static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps); static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx); // ---------------------- INTERNAL FUNCTIONS ---------------------- @@ -242,28 +242,28 @@ int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { if (pHelper->idxH.numOfIdx > 0) { while (true) { if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) { - memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); break; } - SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); + SBlockIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); if (pIdx->tid == TABLE_TID(pTable)) { if (pIdx->uid == TABLE_UID(pTable)) { pHelper->curCompIdx = *pIdx; } else { - memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); } pHelper->idxH.curIdx++; break; } else if (pIdx->tid > TABLE_TID(pTable)) { - memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); break; } else { pHelper->idxH.curIdx++; } } } else { - memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); + memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); } if (helperType(pHelper) == TSDB_WRITE_HELPER && pHelper->curCompIdx.hasLast) { @@ -279,7 +279,7 @@ int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SCompIdx *pIdx = &(pHelper->curCompIdx); + SBlockIdx *pIdx = &(pHelper->curCompIdx); int blkIdx = 0; ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); @@ -305,12 +305,12 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { STsdbCfg *pCfg = &pHelper->pRepo->config; ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SCompIdx * pIdx = &(pHelper->curCompIdx); - SCompBlock compBlock = {0}; + SBlockIdx * pIdx = &(pHelper->curCompIdx); + SBlock compBlock = {0}; if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); + SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last); if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && @@ -360,7 +360,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { } int tsdbWriteCompInfo(SRWHelper *pHelper) { - SCompIdx *pIdx = &(pHelper->curCompIdx); + SBlockIdx *pIdx = &(pHelper->curCompIdx); off_t offset = 0; SFile * pFile = helperNewHeadF(pHelper); @@ -371,8 +371,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->uid = pHelper->tableInfo.uid; pHelper->pCompInfo->tid = pHelper->tableInfo.tid; - ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && - (pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); + ASSERT(pIdx->len > sizeof(SBlockInfo) + sizeof(TSCKSUM) && + (pIdx->len - sizeof(SBlockInfo) - sizeof(TSCKSUM)) % sizeof(SBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); } @@ -396,7 +396,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { return -1; } - if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) { + if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SBlockIdx) + 12) { pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2); if (pHelper->pWIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -405,7 +405,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); - pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); + pFile->info.len += tsdbEncodeSBlockIdx(&pBuf, &(pHelper->curCompIdx)); pFile->info.size += pIdx->len; // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); @@ -456,7 +456,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { } int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) { - const char *prefixMsg = "failed to load SCompIdx part"; + const char *prefixMsg = "failed to load SBlockIdx part"; if (lseek(pFile->fd, offset, SEEK_SET) < 0) { tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), offset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -479,23 +479,23 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe return 0; } -int tsdbDecodeSCompIdxImpl(void *buffer, uint32_t len, SCompIdx **ppCompIdx, int *numOfIdx) { +int tsdbDecodeSBlockIdxImpl(void *buffer, uint32_t len, SBlockIdx **ppCompIdx, int *numOfIdx) { int nIdx = 0; void *pPtr = buffer; while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) { size_t tlen = taosTSizeof(*ppCompIdx); - if (tlen < sizeof(SCompIdx) * (nIdx + 1)) { - *ppCompIdx = (SCompIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2); + if (tlen < sizeof(SBlockIdx) * (nIdx + 1)) { + *ppCompIdx = (SBlockIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2); if (*ppCompIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } } - pPtr = tsdbDecodeSCompIdx(pPtr, &((*ppCompIdx)[nIdx])); + pPtr = tsdbDecodeSBlockIdx(pPtr, &((*ppCompIdx)[nIdx])); if (pPtr == NULL) { - tsdbError("failed to decode SCompIdx part, idx:%d", nIdx); + tsdbError("failed to decode SBlockIdx part, idx:%d", nIdx); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -522,15 +522,15 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { return -1; } - // Load SCompIdx binary from file + // Load SBlockIdx binary from file if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) { return -1; } - // Decode the SCompIdx part - if (tsdbDecodeSCompIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray), + // Decode the SBlockIdx part + if (tsdbDecodeSBlockIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray), &(pHelper->idxH.numOfIdx)) < 0) { - tsdbError("vgId:%d failed to decode SCompIdx part from file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), + tsdbError("vgId:%d failed to decode SBlockIdx part from file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), tstrerror(errno)); return -1; } @@ -540,13 +540,13 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { // Copy the memory for outside usage if (target && pHelper->idxH.numOfIdx > 0) - memcpy(target, pHelper->idxH.pIdxArray, sizeof(SCompIdx) * pHelper->idxH.numOfIdx); + memcpy(target, pHelper->idxH.pIdxArray, sizeof(SBlockIdx) * pHelper->idxH.numOfIdx); return 0; } -int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) { - const char *prefixMsg = "failed to load SCompInfo/SCompBlock part"; +int tsdbLoadCompInfoImpl(SFile *pFile, SBlockIdx *pIdx, SBlockInfo **ppCompInfo) { + const char *prefixMsg = "failed to load SBlockInfo/SBlock part"; if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) { tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), pIdx->offset, strerror(errno)); @@ -579,7 +579,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) { int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); - SCompIdx *pIdx = &(pHelper->curCompIdx); + SBlockIdx *pIdx = &(pHelper->curCompIdx); SFile *pFile = helperHeadF(pHelper); @@ -600,7 +600,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { return 0; } -int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { +int tsdbLoadCompData(SRWHelper *pHelper, SBlock *pCompBlock, void *target) { ASSERT(pCompBlock->numOfSubBlocks <= 1); SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); @@ -639,7 +639,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { } void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) { - SCompData *pCompData = pHelper->pCompData; + SBlockData *pCompData = pHelper->pCompData; for (int i = 0, j = 0; i < numOfCols;) { if (j >= pCompData->numOfCols) { @@ -666,13 +666,13 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) } } -int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo, int16_t *colIds, int numOfColIds) { +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SBlock *pCompBlock, SBlockInfo *pCompInfo, int16_t *colIds, int numOfColIds) { ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block - SCompBlock *pTCompBlock = pCompBlock; + SBlock *pTCompBlock = pCompBlock; int numOfSubBlocks = pCompBlock->numOfSubBlocks; if (numOfSubBlocks > 1) - pTCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); + pTCompBlock = (SBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); tdResetDataCols(pHelper->pDataCols[0]); if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err; @@ -693,12 +693,12 @@ _err: return -1; } -int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo) { - SCompBlock *pTCompBlock = pCompBlock; +int tsdbLoadBlockData(SRWHelper *pHelper, SBlock *pCompBlock, SBlockInfo *pCompInfo) { + SBlock *pTCompBlock = pCompBlock; int numOfSubBlock = pCompBlock->numOfSubBlocks; if (numOfSubBlock > 1) - pTCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); + pTCompBlock = (SBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); tdResetDataCols(pHelper->pDataCols[0]); if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[0]) < 0) goto _err; @@ -728,10 +728,10 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { return false; } -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock, bool isLast, bool isSuperBlock) { STsdbCfg * pCfg = &(pHelper->pRepo->config); - SCompData *pCompData = (SCompData *)(pHelper->pBuffer); + SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); int64_t offset = 0; int rowsToWrite = pDataCols->numOfRows; @@ -749,7 +749,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa int nColsNotAllNull = 0; for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column SDataCol *pDataCol = pDataCols->cols + ncol; - SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; + SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it continue; @@ -779,7 +779,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa if (ncol != 0 && tcol >= nColsNotAllNull) break; SDataCol *pDataCol = pDataCols->cols + ncol; - SCompCol *pCompCol = pCompData->cols + tcol; + SBlockCol *pCompCol = pCompData->cols + tcol; if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; void *tptr = POINTER_SHIFT(pCompData, lsize); @@ -868,7 +868,7 @@ _err: static int compareKeyBlock(const void *arg1, const void *arg2) { TSKEY key = *(TSKEY *)arg1; - SCompBlock *pBlock = (SCompBlock *)arg2; + SBlock *pBlock = (SBlock *)arg2; if (key < pBlock->keyFirst) { return -1; @@ -881,42 +881,42 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { if (taosTSizeof((void *)pHelper->pCompInfo) <= esize) { - size_t tsize = esize + sizeof(SCompBlock) * 16; - pHelper->pCompInfo = (SCompInfo *)taosTRealloc(pHelper->pCompInfo, tsize); + size_t tsize = esize + sizeof(SBlock) * 16; + pHelper->pCompInfo = (SBlockInfo *)taosTRealloc(pHelper->pCompInfo, tsize); if (pHelper->pCompInfo == NULL) return -1; } return 0; } -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { - SCompIdx *pIdx = &(pHelper->curCompIdx); +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { + SBlockIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks); ASSERT(pCompBlock->numOfSubBlocks == 1); // Adjust memory if no more room - if (pIdx->len == 0) pIdx->len = sizeof(SCompInfo) + sizeof(TSCKSUM); - if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err; + if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM); + if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err; // Change the offset for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) { - SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); + SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); } // Memmove if needed - int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx); + int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx); if (tsize > 0) { - ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo)); - ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo)); - memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)), - POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize); + ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo)); + ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo)); + memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)), + POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize); } pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pIdx->numOfBlocks++; - pIdx->len += sizeof(SCompBlock); + pIdx->len += sizeof(SBlock); ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo)); pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; @@ -936,47 +936,47 @@ _err: return -1; } -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo) { +static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo) { ASSERT(pCompBlock->numOfSubBlocks == 0); - SCompIdx *pIdx = &(pHelper->curCompIdx); + SBlockIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); - SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); + SBlock *pSBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(pSBlock->numOfSubBlocks >= 1 && pSBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); size_t spaceNeeded = - (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock); + (pSBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock); if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; - pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + pSBlock = pHelper->pCompInfo->blocks + blkIdx; // Add the sub-block - if (pSCompBlock->numOfSubBlocks > 1) { - size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); + if (pSBlock->numOfSubBlocks > 1) { + size_t tsize = (size_t)(pIdx->len - (pSBlock->offset + pSBlock->len)); if (tsize > 0) { - memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)), - (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + memmove((void *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len + sizeof(SBlock)), + (void *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len), tsize); for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); + SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); } } - *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; + *(SBlock *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len) = *pCompBlock; - pSCompBlock->numOfSubBlocks++; - ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); - pSCompBlock->len += sizeof(SCompBlock); - pSCompBlock->numOfRows = pSCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; - pSCompBlock->keyFirst = pMergeInfo->keyFirst; - pSCompBlock->keyLast = pMergeInfo->keyLast; - pIdx->len += sizeof(SCompBlock); + pSBlock->numOfSubBlocks++; + ASSERT(pSBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); + pSBlock->len += sizeof(SBlock); + pSBlock->numOfRows = pSBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; + pSBlock->keyFirst = pMergeInfo->keyFirst; + pSBlock->keyLast = pMergeInfo->keyLast; + pIdx->len += sizeof(SBlock); } else { // Need to create two sub-blocks void *ptr = NULL; for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; if (pTCompBlock->numOfSubBlocks > 1) { ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset); break; @@ -987,26 +987,26 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); if (tsize > 0) { - memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize); + memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize); for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2); + SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2); } } - ((SCompBlock *)ptr)[0] = *pSCompBlock; - ((SCompBlock *)ptr)[0].numOfSubBlocks = 0; + ((SBlock *)ptr)[0] = *pSBlock; + ((SBlock *)ptr)[0].numOfSubBlocks = 0; - ((SCompBlock *)ptr)[1] = *pCompBlock; + ((SBlock *)ptr)[1] = *pCompBlock; - pSCompBlock->numOfSubBlocks = 2; - pSCompBlock->numOfRows = pSCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; - pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); - pSCompBlock->len = sizeof(SCompBlock) * 2; - pSCompBlock->keyFirst = pMergeInfo->keyFirst; - pSCompBlock->keyLast = pMergeInfo->keyLast; + pSBlock->numOfSubBlocks = 2; + pSBlock->numOfRows = pSBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; + pSBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); + pSBlock->len = sizeof(SBlock) * 2; + pSBlock->keyFirst = pMergeInfo->keyFirst; + pSBlock->keyLast = pMergeInfo->keyLast; - pIdx->len += (sizeof(SCompBlock) * 2); + pIdx->len += (sizeof(SBlock) * 2); } pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; @@ -1020,34 +1020,34 @@ _err: return -1; } -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { ASSERT(pCompBlock->numOfSubBlocks == 1); - SCompIdx *pIdx = &(pHelper->curCompIdx); + SBlockIdx *pIdx = &(pHelper->curCompIdx); ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); - SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + SBlock *pSBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(pSCompBlock->numOfSubBlocks >= 1); + ASSERT(pSBlock->numOfSubBlocks >= 1); // Delete the sub blocks it has - if (pSCompBlock->numOfSubBlocks > 1) { - size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); + if (pSBlock->numOfSubBlocks > 1) { + size_t tsize = (size_t)(pIdx->len - (pSBlock->offset + pSBlock->len)); if (tsize > 0) { - memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset), - POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize); + memmove(POINTER_SHIFT(pHelper->pCompInfo, pSBlock->offset), + POINTER_SHIFT(pHelper->pCompInfo, pSBlock->offset + pSBlock->len), tsize); } for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); + SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSBlock->numOfSubBlocks); } - pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); + pIdx->len -= (sizeof(SBlock) * pSBlock->numOfSubBlocks); } - *pSCompBlock = *pCompBlock; + *pSBlock = *pCompBlock; pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; @@ -1061,12 +1061,12 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int } static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) { - SCompIdx *pCompIdx = &(pHelper->curCompIdx); + SBlockIdx *pCompIdx = &(pHelper->curCompIdx); ASSERT(pCompIdx->numOfBlocks > 0 && blkIdx < pCompIdx->numOfBlocks); - SCompBlock *pCompBlock= blockAtIdx(pHelper, blkIdx); - SCompBlock compBlock = *pCompBlock; + SBlock *pCompBlock= blockAtIdx(pHelper, blkIdx); + SBlock compBlock = *pCompBlock; ASSERT(pCompBlock->numOfSubBlocks > 0 && pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); if (pCompIdx->numOfBlocks == 1) { @@ -1075,21 +1075,21 @@ static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) { int tsize = 0; if (compBlock.numOfSubBlocks > 1) { - tsize = (int)(pCompIdx->len - (compBlock.offset + sizeof(SCompBlock) * compBlock.numOfSubBlocks)); + tsize = (int)(pCompIdx->len - (compBlock.offset + sizeof(SBlock) * compBlock.numOfSubBlocks)); ASSERT(tsize > 0); memmove(POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset), - POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset + sizeof(SCompBlock) * compBlock.numOfSubBlocks), + POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset + sizeof(SBlock) * compBlock.numOfSubBlocks), tsize); - pCompIdx->len = pCompIdx->len - sizeof(SCompBlock) * compBlock.numOfSubBlocks; + pCompIdx->len = pCompIdx->len - sizeof(SBlock) * compBlock.numOfSubBlocks; } tsize = (int)(pCompIdx->len - POINTER_DISTANCE(blockAtIdx(pHelper, blkIdx + 1), pHelper->pCompInfo)); ASSERT(tsize > 0); memmove((void *)blockAtIdx(pHelper, blkIdx), (void *)blockAtIdx(pHelper, blkIdx + 1), tsize); - pCompIdx->len -= sizeof(SCompBlock); + pCompIdx->len -= sizeof(SBlock); pCompIdx->numOfBlocks--; pCompIdx->hasLast = (uint32_t)(blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->last); @@ -1191,7 +1191,7 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t // TODO: pMeta->maxRowBytes and pMeta->maxCols may change here causing invalid write pHelper->pBuffer = - taosTMalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols + + taosTMalloc(sizeof(SBlockData) + (sizeof(SBlockCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols + pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM)); if (pHelper->pBuffer == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -1239,7 +1239,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 return 0; } -static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, +static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, SDataCol *pDataCol) { ASSERT(pDataCol->colId == pCompCol->colId); int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; @@ -1280,14 +1280,14 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl return 0; } -static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { +static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); - SCompCol compCol = {0}; + SBlockCol compCol = {0}; - // If only load timestamp column, no need to load SCompData part + // If only load timestamp column, no need to load SBlockData part if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; pDataCols->numOfRows = pCompBlock->numOfRows; @@ -1297,7 +1297,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, for (int i = 0; i < numOfColIds; i++) { int16_t colId = colIds[i]; SDataCol *pDataCol = NULL; - SCompCol *pCompCol = NULL; + SBlockCol *pCompCol = NULL; while (true) { if (dcol >= pDataCols->numOfCols) { @@ -1357,7 +1357,7 @@ _err: return -1; } -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols) { ASSERT(pCompBlock->numOfSubBlocks <= 1); SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); @@ -1368,7 +1368,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa goto _err; } - SCompData *pCompData = (SCompData *)pHelper->pBuffer; + SBlockData *pCompData = (SBlockData *)pHelper->pBuffer; int fd = pFile->fd; if (lseek(fd, (off_t)pCompBlock->offset, SEEK_SET) < 0) { @@ -1396,7 +1396,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa pDataCols->numOfRows = pCompBlock->numOfRows; // Recover the data - int ccol = 0; // loop iter for SCompCol object + int ccol = 0; // loop iter for SBlockCol object int dcol = 0; // loop iter for SDataCols object while (dcol < pDataCols->numOfCols) { SDataCol *pDataCol = &(pDataCols->cols[dcol]); @@ -1412,7 +1412,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa int32_t tlen = pCompBlock->keyLen; if (dcol != 0) { - SCompCol *pCompCol = &(pCompData->cols[ccol]); + SBlockCol *pCompCol = &(pCompData->cols[ccol]); tcolId = pCompCol->colId; toffset = pCompCol->offset; tlen = pCompCol->len; @@ -1456,7 +1456,7 @@ _err: return -1; } -static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) { +static int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) { int tlen = 0; tlen += taosEncodeVariantI32(buf, pIdx->tid); @@ -1470,7 +1470,7 @@ static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) { return tlen; } -static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { +static void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { uint8_t hasLast = 0; uint32_t numOfBlocks = 0; uint64_t value = 0; @@ -1493,17 +1493,17 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { STsdbCfg * pCfg = &(pHelper->pRepo->config); STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = &(pHelper->curCompIdx); + SBlockIdx * pIdx = &(pHelper->curCompIdx); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SCompBlock compBlock = {0}; + SBlock compBlock = {0}; SMergeInfo mergeInfo = {0}; SMergeInfo *pMergeInfo = &mergeInfo; ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); if (pIdx->hasLast) { // append to with last block ASSERT(pIdx->len > 0); - SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); + SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, NULL, 0, pCfg->update, pMergeInfo); @@ -1556,21 +1556,21 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int *blkIdx) { STsdbCfg * pCfg = &(pHelper->pRepo->config); STable * pTable = pCommitIter->pTable; - SCompIdx * pIdx = &(pHelper->curCompIdx); - SCompBlock compBlock = {0}; + SBlockIdx * pIdx = &(pHelper->curCompIdx); + SBlock compBlock = {0}; TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; SDataCols * pDataCols0 = pHelper->pDataCols[0]; SMergeInfo mergeInfo = {0}; SMergeInfo *pMergeInfo = &mergeInfo; - SCompBlock oBlock = {0}; + SBlock oBlock = {0}; SSkipListIterator slIter = {0}; ASSERT(keyFirst <= pIdx->maxKey); - SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), - pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); + SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), + pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); ASSERT(pCompBlock != NULL); int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); oBlock = *pCompBlock; @@ -1722,7 +1722,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt } } -static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) { +static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) { STsdbCfg *pCfg = &(pHelper->pRepo->config); SFile * pFile = NULL; bool isLast = false; @@ -1743,7 +1743,7 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, return 0; } -static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SCompBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps) { +static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps) { STsdbCfg *pCfg = &(pHelper->pRepo->config); int mergeRows = pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index d5cc566b5541a41e863d4c136746019b4b172ce0..7f063692f49527fe337dcc1c45576a7ca419bdc8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -69,7 +69,7 @@ typedef struct STableCheckInfo { STableId tableId; TSKEY lastKey; STable* pTableObj; - SCompInfo* pCompInfo; + SBlockInfo* pCompInfo; int32_t compSize; int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks int8_t chosen:2; // indicate which iterator should move forward @@ -79,7 +79,7 @@ typedef struct STableCheckInfo { } STableCheckInfo; typedef struct STableBlockInfo { - SCompBlock* compBlock; + SBlock* compBlock; STableCheckInfo* pTableCheckInfo; } STableBlockInfo; @@ -136,7 +136,7 @@ typedef struct STableGroupSupporter { static STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbQueryHandle* pQueryHandle); @@ -669,7 +669,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio return (int32_t)fid; } -static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { +static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { int32_t firstSlot = 0; int32_t lastSlot = numOfBlocks - 1; @@ -712,7 +712,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo break; } - SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; + SBlockIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { @@ -729,12 +729,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo break; } - pCheckInfo->pCompInfo = (SCompInfo*) t; + pCheckInfo->pCompInfo = (SBlockInfo*) t; pCheckInfo->compSize = compIndex->len; } tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); - SCompInfo* pCompInfo = pCheckInfo->pCompInfo; + SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; @@ -763,7 +763,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->numOfBlocks = (end - start); if (start > 0) { - memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock)); + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); } (*numOfBlocks) += pCheckInfo->numOfBlocks; @@ -772,7 +772,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo return code; } -static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { +static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { int64_t st = taosGetTimestampUs(); STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); @@ -838,7 +838,7 @@ static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, i static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle); static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); -static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ +static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){ SQueryFilePos* cur = &pQueryHandle->cur; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); @@ -921,7 +921,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc return code; } -static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { +static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { SQueryFilePos* cur = &pQueryHandle->cur; int32_t code = TSDB_CODE_SUCCESS; @@ -1327,7 +1327,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) { SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; @@ -1626,7 +1626,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO continue; } - SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; + SBlock* pBlock = pTableCheck->pCompInfo->blocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); @@ -2316,7 +2316,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { return pHandle->pColumns; } else { // only load the file block - SCompBlock* pBlock = pBlockInfo->compBlock; + SBlock* pBlock = pBlockInfo->compBlock; if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) { return NULL; } diff --git a/src/tsdb/src/tsdbScan.c b/src/tsdb/src/tsdbScan.c index 91f67878740c90d56a8c5c124be23c1f53e7da40..06a8cc69442a313ebc140d5c84e973c49063cc21 100644 --- a/src/tsdb/src/tsdbScan.c +++ b/src/tsdb/src/tsdbScan.c @@ -25,9 +25,9 @@ void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream) {} int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; } -int tsdbScanSCompIdx(STsdbScanHandle* pScanHandle) { return 0; } +int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle) { return 0; } -int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; } +int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; } int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; }