diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 66cb0bb4ba5c48e6320455a94d5b6778c283fbe4..69c2618ac88935e37427b97fae36a4df564f73cf 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -51,6 +51,10 @@ typedef enum { } ECheckItemType; typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +typedef enum { + TSDB_STATIS_OK = 0, // statis part exist and load successfully + TSDB_STATIS_NONE = 1, // statis part not exist +} ETsdbStatisStatus; extern char *qtypeStr[]; diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 677ca9c3367a85045318bd1d09f75d0fa725f188..6bc89ddd66e919afb642bc6db47ffe269719e007 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -27,12 +27,12 @@ extern "C" { typedef struct SDataStatis { int16_t colId; - int64_t sum; - int64_t max; - int64_t min; int16_t maxIndex; int16_t minIndex; int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; } SDataStatis; typedef struct STable { @@ -53,6 +53,8 @@ typedef struct STsdb STsdb; typedef struct STsdbCfg { int8_t precision; + int8_t update; + int8_t compression; uint64_t lruCacheSize; int32_t daysPerFile; int32_t minRowsPerFileBlock; @@ -60,8 +62,6 @@ typedef struct STsdbCfg { int32_t keep; int32_t keep1; int32_t keep2; - int8_t update; - int8_t compression; } STsdbCfg; // query condition to build multi-table data block iterator diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index af432aa9d9f836ada62d6ff59c7cd9566b145336..dab697ce8bd39ea9c7497dd73e08bd94b553fffa 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -18,8 +18,6 @@ #include "tsdbFile.h" -#define TSDB_FS_VERSION 0 - // ================== TSDB global config extern bool tsdbForceKeepFile; diff --git a/source/dnode/vnode/src/inc/tsdbFile.h b/source/dnode/vnode/src/inc/tsdbFile.h index 15c8d512d6d54beca9871da0021b5881e4a5a1f9..bbeb6c6a0f70b241a6857df29141bdd4400349ba 100644 --- a/source/dnode/vnode/src/inc/tsdbFile.h +++ b/source/dnode/vnode/src/inc/tsdbFile.h @@ -44,7 +44,37 @@ #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) -typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; +typedef enum { + TSDB_FILE_HEAD = 0, // .head + TSDB_FILE_DATA, // .data + TSDB_FILE_LAST, // .last + TSDB_FILE_SMAD, // .smad(Block-wise SMA) + TSDB_FILE_SMAL, // .smal(Block-wise SMA) + TSDB_FILE_MAX, // + TSDB_FILE_META, // meta + TSDB_FILE_TSMA, // .tsma.${sma_index_name}, Time-range-wise SMA + TSDB_FILE_RSMA, // .rsma.${sma_index_name}, Time-range-wise Rollup SMA +} TSDB_FILE_T; + +typedef enum { + TSDB_FS_VER_0 = 0, + TSDB_FS_VER_MAX, +} ETsdbFsVer; + +#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile +#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file + +static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile + switch (fType) { + case TSDB_FILE_HEAD: // .head + case TSDB_FILE_DATA: // .data + case TSDB_FILE_LAST: // .last + case TSDB_FILE_SMAD: // .smad(Block-wise SMA) + case TSDB_FILE_SMAL: // .smal(Block-wise SMA) + default: + return TSDB_LATEST_FVER; + } +} #if 0 // =============== SMFile @@ -169,6 +199,7 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby // =============== SDFile typedef struct { uint32_t magic; + uint32_t fver; uint32_t len; uint32_t totalBlocks; uint32_t totalSubBlocks; @@ -188,7 +219,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile); void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile); -int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader); +int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType); int tsdbUpdateDFileHeader(SDFile* pDFile); int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo); int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version); @@ -292,12 +323,18 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { // =============== SDFileSet typedef struct { - int fid; - int state; - SDFile files[TSDB_FILE_MAX]; + int fid; + int8_t state; // -128~127 + uint8_t ver; // 0~255, DFileSet version + uint16_t reserve; + SDFile files[TSDB_FILE_MAX]; } SDFileSet; +#define TSDB_LATEST_FSET_VER 0 + #define TSDB_FSET_FID(s) ((s)->fid) +#define TSDB_FSET_STATE(s) ((s)->state) +#define TSDB_FSET_VER(s) ((s)->ver) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) diff --git a/source/dnode/vnode/src/inc/tsdbReadImpl.h b/source/dnode/vnode/src/inc/tsdbReadImpl.h index 8ecb4e4a6ebe7a8b15a7eeef8cb6d48e5eb8d889..16eb55967caac0697b2b35088b9b4473c86ba8a2 100644 --- a/source/dnode/vnode/src/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/src/inc/tsdbReadImpl.h @@ -36,6 +36,7 @@ typedef struct { TSKEY maxKey; } SBlockIdx; +#ifdef TD_REFACTOR_3 typedef struct { int64_t last : 1; int64_t offset : 63; @@ -49,22 +50,35 @@ typedef struct { TSKEY keyLast; } SBlock; +#else + +typedef enum { + TSDB_SBLK_VER_0 = 0, + TSDB_SBLK_VER_MAX, +} ESBlockVer; + +#define SBlockVerLatest TSDB_SBLK_VER_0 + typedef struct { - int64_t last : 1; - int64_t offset : 63; - int32_t algorithm : 8; - int32_t numOfRows : 24; - uint8_t reserve0; + uint8_t last : 1; + uint8_t blkVer : 7; uint8_t numOfSubBlocks; int16_t numOfCols; // not including timestamp column - uint32_t len : 32; // data block length + uint32_t len; // data block length uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols - uint32_t reserve1 : 8; - uint64_t blkVer : 8; - uint64_t aggrOffset : 56; + uint32_t reserve : 8; + int32_t algorithm : 8; + int32_t numOfRows : 24; + int64_t offset; + uint64_t aggrStat : 1; + uint64_t aggrOffset : 63; TSKEY keyFirst; TSKEY keyLast; -} SBlock_3; +} SBlockV0; + +#define SBlock SBlockV0 // latest SBlock definition + +#endif typedef struct { int32_t delimiter; // For recovery usage @@ -73,6 +87,7 @@ typedef struct { SBlock blocks[]; } SBlockInfo; +#ifdef TD_REFACTOR_3 typedef struct { int16_t colId; uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM @@ -89,17 +104,50 @@ typedef struct { uint8_t offsetH; char padding[1]; } SBlockCol; +#else +typedef struct { + int16_t colId; + uint8_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM + uint8_t reserve : 7; + uint8_t type; + int32_t len; + uint32_t offset; +} SBlockColV0; + +#define SBlockCol SBlockColV0 // latest SBlockCol definition + +typedef struct { + int16_t colId; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; +} SAggrBlkColV0; + +#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition + +#endif // Code here just for back-ward compatibility static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) { +#ifdef TD_REFACTOR_3 pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1); pBlockCol->offsetH = (uint8_t)(offset >> 24); +#else + pBlockCol->offset = offset; +#endif } static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) { +#ifdef TD_REFACTOR_3 uint32_t offset1 = pBlockCol->offset; uint32_t offset2 = pBlockCol->offsetH; return (offset1 | (offset2 << 24)); +#else + return pBlockCol->offset; +#endif } typedef struct { @@ -109,31 +157,57 @@ typedef struct { SBlockCol cols[]; } SBlockData; +typedef void SAggrBlkData; // SBlockCol cols[]; + struct SReadH { - STsdb * pRepo; - SDFileSet rSet; // FSET to read - SArray * aBlkIdx; // SBlockIdx array - STable * pTable; // table to read - SBlockIdx * pBlkIdx; // current reading table SBlockIdx - int cidx; - SBlockInfo *pBlkInfo; - SBlockData *pBlkData; // Block info - SDataCols * pDCols[2]; - void * pBuf; // buffer - void * pCBuf; // compression buffer + STsdb * pRepo; + SDFileSet rSet; // FSET to read + SArray * aBlkIdx; // SBlockIdx array + STable * pTable; // table to read + SBlockIdx * pBlkIdx; // current reading table SBlockIdx + int cidx; + SBlockInfo * pBlkInfo; + SBlockData * pBlkData; // Block info + SAggrBlkData *pAggrBlkData; // Aggregate Block info + SDataCols * pDCols[2]; + void * pBuf; // buffer + void * pCBuf; // compression buffer + void * pExBuf; // extra buffer }; -#define TSDB_READ_REPO(rh) ((rh)->pRepo) -#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) -#define TSDB_READ_FSET(rh) (&((rh)->rSet)) -#define TSDB_READ_TABLE(rh) ((rh)->pTable) +#define TSDB_READ_REPO(rh) ((rh)->pRepo) +#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) +#define TSDB_READ_FSET(rh) (&((rh)->rSet)) +#define TSDB_READ_TABLE(rh) ((rh)->pTable) #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) -#define TSDB_READ_BUF(rh) ((rh)->pBuf) -#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) +#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD) +#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL) +#define TSDB_READ_BUF(rh) ((rh)->pBuf) +#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) +#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) + +#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \ + (sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM)) + +static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { + switch (blkVer) { + case TSDB_SBLK_VER_0: + default: + return TSDB_BLOCK_STATIS_SIZE(nCols, 0); + } +} -#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) +#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM)) + +static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { + switch (blkVer) { + case TSDB_SBLK_VER_0: + default: + return TSDB_BLOCK_AGGR_SIZE(nCols, 0); + } +} int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); void tsdbDestroyReadH(SReadH *pReadh); @@ -147,7 +221,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); -void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols); +void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock); static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { void * pBuf = *ppBuf; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index f7e4d56fe2c22e77895b9b6971c8a937b6277c68..2e7116969e1e0f3d418b53b084b236469f94daad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -50,8 +50,11 @@ typedef struct { #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) +#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) +#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) +#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) @@ -509,7 +512,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // TSDB_FILE_HEAD SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); - if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) { + if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), tstrerror(terrno)); @@ -560,7 +563,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); pCommith->isLFileSame = false; - if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) { + if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), tstrerror(terrno)); @@ -572,6 +575,74 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid } } } + + // TSDB_FILE_SMAD + SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh)); + SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith); + + if (access(TSDB_FILE_FULL_NAME(pRSmadF), F_OK) != 0) { + tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF)); + tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD); + + if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + (void)tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } else { + tsdbInitDFileEx(pWSmadF, pRSmadF); + if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } + + // TSDB_FILE_SMAL + SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh)); + SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith); + + if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) { + tsdbInitDFileEx(pWSmalF, pRSmalF); + if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } else { + tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF)); + tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); + + if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + (void)tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } } return 0; @@ -1131,41 +1202,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, - bool isSuper, void **ppBuf, void **ppCBuf) { - STsdbCfg *pCfg = REPO_CFG(pRepo); - SBlockData *pBlockData; - int64_t offset = 0; - int rowsToWrite = pDataCols->numOfRows; +int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, + SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { + STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlockData * pBlockData = NULL; + SAggrBlkData *pAggrBlkData = NULL; + int64_t offset = 0, offsetAggr = 0; + int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); // Make buffer space - if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { + if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { return -1; } pBlockData = (SBlockData *)(*ppBuf); + if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { + return -1; + } + pAggrBlkData = (SAggrBlkData *)(*ppExBuf); + // Get # of cols not all NULL(not including key column) int nColsNotAllNull = 0; - for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; + for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull; + SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull; if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it continue; } memset(pBlockCol, 0, sizeof(*pBlockCol)); + memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol)); pBlockCol->colId = pDataCol->colId; pBlockCol->type = pDataCol->type; + pAggrBlkCol->colId = pDataCol->colId; + if (tDataTypes[pDataCol->type].statisFunc) { +#if 0 (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); - if (pBlockCol->numOfNull == 0) { +#endif + (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max), + &(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), + &(pAggrBlkCol->numOfNull)); + + if (pAggrBlkCol->numOfNull == 0) { TD_SET_COL_ROWS_NORM(pBlockCol); } else { TD_SET_COL_ROWS_MISC(pBlockCol); @@ -1181,13 +1268,14 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * // Compress the data if neccessary int tcol = 0; // counter of not all NULL and written columns uint32_t toffset = 0; - int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); + int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest); int32_t lsize = tsize; + uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest); int32_t keyLen = 0; int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite); int32_t tBitmaps = 0; - for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) { // All not NULL columns finish if (ncol != 0 && tcol >= nColsNotAllNull) break; @@ -1248,7 +1336,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * if (ncol != 0) { tsdbSetBlockColOffset(pBlockCol, toffset); pBlockCol->len = flen; - tcol++; + ++tcol; } else { keyLen = flen; } @@ -1269,6 +1357,18 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * return -1; } + uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0; + if (aggrStatus > 0) { + + taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); + tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); + + // Write the whole block to file + if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) { + return -1; + } + } + // Update pBlock membership vairables pBlock->last = isLast; pBlock->offset = offset; @@ -1280,6 +1380,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * pBlock->numOfCols = nColsNotAllNull; pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyLast = dataColsKeyLast(pDataCols); + pBlock->aggrStat = aggrStatus; + pBlock->blkVer = SBlockVerLatest; + pBlock->aggrOffset = (uint64_t)offsetAggr; tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, @@ -1291,9 +1394,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { - return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast, - isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), - (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); + return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, + isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols, + pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), + (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith)))); } static int tsdbWriteBlockInfo(SCommitH *pCommih) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 80b59d8756b82901698491f549dca4c6fcda2fdd..e24d5974af5b5a9c11344fec48d053ff932e9717 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -38,6 +38,8 @@ typedef struct { #define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD) #define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA) #define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST) +#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD) +#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL) #define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh)) #define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh)) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 24c765d3e5939c4dcb3fb9cb6244a25592b6f4bc..dcc9700d83144fc75683faaddeaf472be6918e10 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -422,7 +422,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { return -1; } - fsheader.version = TSDB_FS_VERSION; + fsheader.version = TSDB_LATEST_SFS_VER; if (taosArrayGetSize(pStatus->df) == 0) { fsheader.len = 0; } else { @@ -697,7 +697,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) { ptr = tsdbDecodeFSHeader(ptr, &fsheader); ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta)); - if (fsheader.version != TSDB_FS_VERSION) { + if (fsheader.version != TSDB_LATEST_SFS_VER) { // TODO: handle file version change } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 5a00fa20a0d530189a41b048ad1a1b0e515248ef..4d70c429ffc86b9330671067f670a70d15242869 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -19,8 +19,12 @@ static const char *TSDB_FNAME_SUFFIX[] = { "head", // TSDB_FILE_HEAD "data", // TSDB_FILE_DATA "last", // TSDB_FILE_LAST + "smad", // TSDB_FILE_SMAD + "smal", // TSDB_FILE_SMAL "", // TSDB_FILE_MAX "meta", // TSDB_FILE_META + "tsma", // TSDB_FILE_TSMA + "rsma", // TSDB_FILE_RSMA }; static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); @@ -304,6 +308,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t memset(&(pDFile->info), 0, sizeof(pDFile->info)); pDFile->info.magic = TSDB_FILE_INIT_MAGIC; + pDFile->info.fver = tsdbGetDFSVersion(ftype); tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname); tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname); @@ -341,7 +346,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) { } static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { - char *aname; + char *aname = NULL; buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); buf = taosDecodeString(buf, &aname); @@ -352,7 +357,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { return buf; } -int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) { +int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); @@ -382,6 +387,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) { } pDFile->info.size += TSDB_FILE_HEAD_SIZE; + pDFile->info.fver = tsdbGetDFSVersion(fType); if (tsdbUpdateDFileHeader(pDFile) < 0) { tsdbCloseDFile(pDFile); @@ -493,6 +499,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { int tlen = 0; tlen += taosEncodeFixedU32(buf, pInfo->magic); + tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedU32(buf, pInfo->len); tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); @@ -505,6 +512,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + buf = taosDecodeFixedU32(buf, &(pInfo->fver)); buf = taosDecodeFixedU32(buf, &(pInfo->len)); buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); @@ -562,8 +570,10 @@ static int tsdbRollBackDFile(SDFile *pDFile) { // ============== Operations on SDFileSet void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) { - pSet->fid = fid; - pSet->state = 0; + TSDB_FSET_FID(pSet) = fid; + TSDB_FSET_VER(pSet) = TSDB_LATEST_FSET_VER; + TSDB_FSET_STATE(pSet) = 0; + pSet->reserve = 0; for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); @@ -572,7 +582,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3 } void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) { - pSet->fid = pOSet->fid; + TSDB_FSET_FID(pSet) = TSDB_FSET_FID(pOSet); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype)); } @@ -581,7 +591,10 @@ void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) { int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pSet->fid); + tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet)); + // state not included + tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet)); + tlen += taosEncodeFixedU16(buf, pSet->reserve); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } @@ -590,11 +603,11 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { } void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) { - int32_t fid; + buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet))); + TSDB_FSET_STATE(pSet) = 0; + buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet))); + buf = taosDecodeFixedU16(buf, &(pSet->reserve)); - buf = taosDecodeFixedI32(buf, &(fid)); - pSet->state = 0; - pSet->fid = fid; for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype)); } @@ -604,7 +617,9 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) { int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pSet->fid); + tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet)); + tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet)); + tlen += taosEncodeFixedU16(buf, pSet->reserve); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } @@ -613,10 +628,10 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) { } void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) { - int32_t fid; + buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet))); + buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet))); + buf = taosDecodeFixedU16(buf, &(pSet->reserve)); - buf = taosDecodeFixedI32(buf, &(fid)); - pSet->fid = fid; for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } @@ -637,7 +652,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) { + if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) { tsdbCloseDFileSet(pSet); tsdbRemoveDFileSet(pSet); return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 812ec67ec49dc4901a18fc39ad2b12ed9e8a1429..2d8c4701132a9c9818775654df8751b5b4a2ec75 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -821,9 +821,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // file block with sub-blocks has no statistics data if (pBlock->numOfSubBlocks <= 1) { - tsdbLoadBlockStatis(pReadh, pBlock); - tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns); - loadStatisData = true; + if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) { + tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock); + loadStatisData = true; + } } for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c1cf776cb542110f6089ab9e684d2e2756a8040c..4bd0ee959e0db8e9b84e7e02566e497e15dcb066 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3276,8 +3276,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati } int64_t stime = taosGetTimestampUs(); - if (tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock) < 0) { + int statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock); + if (statisStatus < TSDB_STATIS_OK) { return terrno; + } else if (statisStatus > TSDB_STATIS_OK) { + *pBlockStatis = NULL; + return TSDB_CODE_SUCCESS; } int16_t* colIds = pHandle->defaultLoadColumn->pData; @@ -3288,7 +3292,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati pHandle->statis[i].colId = colIds[i]; } - tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols); + tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock); // always load the first primary timestamp column data SDataStatis* pPrimaryColStatis = &pHandle->statis[0]; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 7e8e7866d71addb8a125998bf3fa54963b2466cb..f6827eaae187dfb3da31110c81dcf38c24a67702 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -19,6 +19,7 @@ static void tsdbResetReadTable(SReadH *pReadh); static void tsdbResetReadFile(SReadH *pReadh); +static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock); static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, int numOfBitmaps, int lenOfBitmaps, int maxPoints, char *buffer, @@ -63,10 +64,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) { void tsdbDestroyReadH(SReadH *pReadh) { if (pReadh == NULL) return; + pReadh->pExBuf = taosTZfree(pReadh->pExBuf); pReadh->pCBuf = taosTZfree(pReadh->pCBuf); pReadh->pBuf = taosTZfree(pReadh->pBuf); pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]); pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]); + pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData); pReadh->pBlkData = taosTZfree(pReadh->pBlkData); pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo); pReadh->cidx = 0; @@ -305,15 +308,58 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { ASSERT(pBlock->numOfSubBlocks <= 1); - SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); + if (!pBlock->aggrStat) { + return TSDB_STATIS_NONE; + } + + SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh); + if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, + tstrerror(terrno)); + return -1; + } + + size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer); + if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1; + + int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr); + if (nreadAggr < 0) { + tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno), + (uint64_t)pBlock->aggrOffset, sizeAggr); + return -1; + } + + if (nreadAggr < sizeAggr) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu + " read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr, + nreadAggr); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr); + return -1; + } + return 0; +} + +static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) { + ASSERT(pBlock->numOfSubBlocks <= 1); + SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno)); return -1; } - size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); + size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer); if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1; int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size); @@ -337,7 +383,6 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size); return -1; } - return 0; } @@ -375,13 +420,14 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { return buf; } -void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) { +void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) { +#ifdef TD_REFACTOR_3 SBlockData *pBlockData = pReadh->pBlkData; for (int i = 0, j = 0; i < numOfCols;) { if (j >= pBlockData->numOfCols) { pStatis[i].numOfNull = -1; - i++; + ++i; continue; } @@ -392,15 +438,45 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) { pStatis[i].maxIndex = pBlockData->cols[j].maxIndex; pStatis[i].minIndex = pBlockData->cols[j].minIndex; pStatis[i].numOfNull = pBlockData->cols[j].numOfNull; - i++; - j++; + ++i; + ++j; } else if (pStatis[i].colId < pBlockData->cols[j].colId) { pStatis[i].numOfNull = -1; - i++; + ++i; } else { - j++; + ++j; } } +#else + if (pBlock->aggrStat) { + SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData; + + for (int i = 0, j = 0; i < numOfCols;) { + if (j >= pBlock->numOfCols) { + pStatis[i].numOfNull = -1; + ++i; + continue; + } + SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j; + if (pStatis[i].colId == pAggrBlkCol->colId) { + pStatis[i].sum = pAggrBlkCol->sum; + pStatis[i].max = pAggrBlkCol->max; + pStatis[i].min = pAggrBlkCol->min; + pStatis[i].maxIndex = pAggrBlkCol->maxIndex; + pStatis[i].minIndex = pAggrBlkCol->minIndex; + pStatis[i].numOfNull = pAggrBlkCol->numOfNull; + ++i; + ++j; + } else if (pStatis[i].colId < pAggrBlkCol->colId) { + pStatis[i].numOfNull = -1; + ++i; + } else { + ++j; + } + } + } + +#endif } static void tsdbResetReadTable(SReadH *pReadh) { @@ -449,7 +525,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat return -1; } - int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); + int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer); if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d", @@ -592,7 +668,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * tdResetDataCols(pDataCols); // If only load timestamp column, no need to load SBlockData part - if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1; + if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1; pDataCols->numOfRows = pBlock->numOfRows; @@ -686,7 +762,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1; - int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + tsdbGetBlockColOffset(pBlockCol); + int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) + + tsdbGetBlockColOffset(pBlockCol); if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno)); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c8cafb93dde0b08edc329ebba1394bb047c3518b..176290ba38db5043ed4c4cfb07acdef0181a171e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")