From cabc21775f888595ed7a860b848bbab2580dfc37 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 26 Aug 2021 14:15:38 +0800 Subject: [PATCH] [TD-6117]:Multi-level aggregate query optimization --- src/common/inc/tglobal.h | 4 + src/common/src/tglobal.c | 4 + src/dnode/src/dnodeSystem.c | 5 ++ src/inc/taosdef.h | 2 + src/tsdb/inc/tsdbCommit.h | 4 +- src/tsdb/inc/tsdbFS.h | 8 +- src/tsdb/inc/tsdbFile.h | 2 +- src/tsdb/inc/tsdbReadImpl.h | 34 +++++++- src/tsdb/src/tsdbCommit.c | 73 ++++++++++++++-- src/tsdb/src/tsdbCompact.c | 18 ++-- src/tsdb/src/tsdbFile.c | 1 + src/tsdb/src/tsdbReadImpl.c | 170 +++++++++++++++++++++++++----------- 12 files changed, 255 insertions(+), 70 deletions(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 4b8347ead0..9a5209531d 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -110,6 +110,10 @@ extern int8_t tsCacheLastRow; //tsdb extern bool tsdbForceKeepFile; +#ifdef __TD_6117__ +extern bool tsdbQueryFromSMA; +#endif + // balance extern int8_t tsEnableBalance; extern int8_t tsAlternativeRole; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index f169b07bb2..92b3e58ad0 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -156,6 +156,10 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO; // For backward compatibility bool tsdbForceKeepFile = false; +#ifdef __TD_6117__ +bool tsdbQueryFromSMA = true; +#endif + // balance int8_t tsEnableBalance = 1; int8_t tsAlternativeRole = 0; diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 2f77788025..2176aadf86 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -42,6 +42,11 @@ int32_t main(int32_t argc, char *argv[]) { } } else if (strcmp(argv[i], "-C") == 0) { dump_config = 1; + } +#ifdef __TD_6117__ + else if (strcmp(argv[i], "--disable-query-from-sma") == 0) { + tsdbQueryFromSMA = false; +#endif } else if (strcmp(argv[i], "--force-keep-file") == 0) { tsdbForceKeepFile = true; } else if (strcmp(argv[i], "--compact-mnode-wal") == 0) { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 44b3a2cf0d..7e9e9c6909 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -447,6 +447,8 @@ typedef enum { TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +#define __TD_6117__ + extern char *qtypeStr[]; #ifdef __cplusplus diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index cde728b170..9cb8417c45 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -38,8 +38,8 @@ void *tsdbCommitData(STsdbRepo *pRepo); int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, - bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, + SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf); int tsdbApplyRtn(STsdbRepo *pRepo); static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 3b6b6449f6..f7d234138b 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -16,7 +16,13 @@ #ifndef _TD_TSDB_FS_H_ #define _TD_TSDB_FS_H_ -#define TSDB_FS_VERSION 0 +/** + * The fileset .head/.data/.last/.sma use the same TSDB_FS_VERSION. + * 0 - original format before 2021.08.25 // TODO update date 2021.08.25 to release version. + * 1 - extract aggregation block data from .data file and save to separated .sma file since 2021.08.25 // TODO update + * date to release version. + */ +#define TSDB_FS_VERSION 1 // ================== CURRENT file header info typedef struct { diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index b9d5431de6..10d119eed1 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -38,7 +38,7 @@ #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) -typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; +typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_SMA, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; // =============== SMFile typedef struct { diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 814c4d1305..132ea0634c 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -42,6 +42,11 @@ typedef struct { int32_t numOfRows : 24; int32_t len; int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols +#ifdef __TD_6117__ + int64_t hasAggr : 1; + int64_t aggrOffset : 63; + int32_t aggrLen; +#endif int16_t numOfSubBlocks; int16_t numOfCols; // not including timestamp column TSKEY keyFirst; @@ -70,6 +75,18 @@ typedef struct { char padding[1]; } SBlockCol; +typedef struct { + int16_t colId; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; + uint8_t type; + char reserved[15]; // Adjust the size of reserved array whenever adding new field of SAggrBlkCol. +} SAggrBlkCol; + // Code here just for back-ward compatibility static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) { pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1); @@ -88,6 +105,12 @@ typedef struct { uint64_t uid; // For recovery usage SBlockCol cols[]; } SBlockData; +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + uint64_t uid; // For recovery usage + SAggrBlkCol cols[]; +} SAggrBlkData; struct SReadH { STsdbRepo * pRepo; @@ -98,9 +121,13 @@ struct SReadH { int cidx; SBlockInfo *pBlkInfo; SBlockData *pBlkData; // Block info +#ifdef __TD_6117__ + SAggrBlkData *pAggrBlkData; // Block info +#endif SDataCols * pDCols[2]; - void * pBuf; // buffer + void * pRBuf; // buffer void * pCBuf; // compression buffer + void * pExBuf; // extra buffer }; #define TSDB_READ_REPO(rh) ((rh)->pRepo) @@ -110,10 +137,13 @@ struct SReadH { #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) -#define TSDB_READ_BUF(rh) ((rh)->pBuf) +#define TSDB_READ_AGGR_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMA) +#define TSDB_READ_BUF(rh) ((rh)->pRBuf) #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) +#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) #define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) +#define TSDB_BLOCK_AGGR_SIZE(ncols) (sizeof(SAggrBlkData) + sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM)) int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo); void tsdbDestroyReadH(SReadH *pReadh); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 15fc3cc47d..78e40439cf 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -35,6 +35,7 @@ typedef struct { SDFileSet wSet; bool isDFileSame; bool isLFileSame; + bool isSmaFileSame; TSKEY minKey; TSKEY maxKey; SArray * aBlkIdx; // SBlockIdx array @@ -51,8 +52,10 @@ typedef struct { #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) +#define TSDB_COMMIT_AGGR_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMA) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) +#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) @@ -912,7 +915,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) { } else { int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision)); if (fid == TSDB_IVLD_FID || fid > tfid) { - fid = tfid; + fid = tfid; // find the least fid } } } @@ -1053,11 +1056,12 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, - bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) { +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, + SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { STsdbCfg * pCfg = REPO_CFG(pRepo); SBlockData *pBlockData; - int64_t offset = 0; + SAggrBlkData *pAggrBlkData = NULL; + int64_t offset = 0, offsetAggr = 0; int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); @@ -1069,24 +1073,38 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo } pBlockData = (SBlockData *)(*ppBuf); + if (tsdbMakeRoom(ppExBuf, TSDB_BLOCK_AGGR_SIZE(pDataCols->numOfCols)) < 0) { + return -1; + } + pAggrBlkData = (SAggrBlkData *)(*ppExBuf); + // Get # of cols not all NULL(not including key column) int nColsNotAllNull = 0; for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column SDataCol * pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; + SAggrBlkCol *pAggrBlkCol = pAggrBlkData->cols + nColsNotAllNull; if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it continue; } memset(pBlockCol, 0, sizeof(*pBlockCol)); + memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol)); pBlockCol->colId = pDataCol->colId; pBlockCol->type = pDataCol->type; + + pAggrBlkCol->colId = pDataCol->colId; + pAggrBlkCol->type = pDataCol->type; + if (tDataTypes[pDataCol->type].statisFunc) { (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); + (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max), + &(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), + &(pAggrBlkCol->numOfNull)); } nColsNotAllNull++; } @@ -1099,12 +1117,15 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); int32_t lsize = tsize; int32_t keyLen = 0; + + int32_t tsizeAggr = TSDB_BLOCK_AGGR_SIZE(nColsNotAllNull); + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { // All not NULL columns finish if (ncol != 0 && tcol >= nColsNotAllNull) break; - SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + tcol; + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol * pBlockCol = pBlockData->cols + tcol; if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; @@ -1165,6 +1186,20 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo return -1; } +#ifdef __TD_6117__ + pAggrBlkData->delimiter = TSDB_FILE_DELIMITER; + pAggrBlkData->uid = TABLE_UID(pTable); + pAggrBlkData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); + tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); + + // Write the whole block to file + if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) { + return -1; + } +#endif + // Update pBlock membership vairables pBlock->last = isLast; pBlock->offset = offset; @@ -1176,6 +1211,11 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo pBlock->numOfCols = nColsNotAllNull; pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyLast = dataColsKeyLast(pDataCols); +#ifdef __TD_6117__ + pBlock->hasAggr = 1; + pBlock->aggrOffset = offsetAggr; + pBlock->aggrLen = tsizeAggr; +#endif tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, @@ -1187,9 +1227,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { - return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast, + return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, TSDB_COMMIT_AGGR_FILE(pCommith), pDataCols, pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), - (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); + (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith)))); } @@ -1611,6 +1651,23 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid } } } + + // TSDB_FILE_SMA + SDFile *pRSmaf = TSDB_READ_AGGR_FILE(&(pCommith->readh)); + SDFile *pWSmaf = TSDB_COMMIT_AGGR_FILE(pCommith); + tsdbInitDFileEx(pWSmaf, pRSmaf); + if (tsdbOpenDFile(pWSmaf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + pCommith->isSmaFileSame = true; } return 0; diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 5ccb9e90f2..6eb8a35e06 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -37,8 +37,10 @@ typedef struct { #define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD) #define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA) #define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST) +#define TSDB_COMPACT_AGGR_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMA) #define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh)) #define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh)) +#define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh)) static int tsdbAsyncCompact(STsdbRepo *pRepo); static void tsdbStartCompact(STsdbRepo *pRepo); @@ -56,7 +58,7 @@ static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet); static void tsdbCompactFSetEnd(SCompactH *pComph); static int tsdbCompactFSetImpl(SCompactH *pComph); static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf, - void **ppCBuf); + void **ppCBuf, void **ppExBuf); enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT}; int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); } @@ -421,6 +423,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { SBlockIdx blkIdx; void ** ppBuf = &(TSDB_COMPACT_BUF(pComph)); void ** ppCBuf = &(TSDB_COMPACT_COMP_BUF(pComph)); + void ** ppExBuf = &(TSDB_COMPACT_EXBUF(pComph)); int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); taosArrayClear(pComph->aBlkIdx); @@ -451,7 +454,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { // Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file if (pComph->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) { - if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf) < 0) { + if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } } else { @@ -467,7 +470,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { break; } - if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) { + if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } tdResetDataCols(pComph->pDataCols); @@ -476,7 +479,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } if (pComph->pDataCols->numOfRows > 0 && - tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) { + tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } @@ -499,7 +502,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf, - void **ppCBuf) { + void **ppCBuf, void **ppExBuf) { STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph); STsdbCfg * pCfg = REPO_CFG(pRepo); SDFile * pDFile; @@ -516,7 +519,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { isLast = false; } - if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, pDataCols, &block, isLast, true, ppBuf, ppCBuf) < 0) { + if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, TSDB_COMPACT_AGGR_FILE(pComph), pDataCols, &block, isLast, true, + ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } @@ -526,5 +530,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } return 0; -} + } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 0f13b6108f..bf0d191cf6 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -19,6 +19,7 @@ static const char *TSDB_FNAME_SUFFIX[] = { "head", // TSDB_FILE_HEAD "data", // TSDB_FILE_DATA "last", // TSDB_FILE_LAST + "sma", // TSDB_FILE_SMA(Small Materialized Aggregation) "", // TSDB_FILE_MAX "meta", // TSDB_FILE_META }; diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 74d41cce19..5816ed8556 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -61,9 +61,11 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { void tsdbDestroyReadH(SReadH *pReadh) { if (pReadh == NULL) return; - +#ifdef __TD_6117__ + pReadh->pExBuf = taosTZfree(pReadh->pExBuf); +#endif pReadh->pCBuf = taosTZfree(pReadh->pCBuf); - pReadh->pBuf = taosTZfree(pReadh->pBuf); + pReadh->pRBuf = taosTZfree(pReadh->pRBuf); pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]); pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]); pReadh->pBlkData = taosTZfree(pReadh->pBlkData); @@ -298,38 +300,80 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { ASSERT(pBlock->numOfSubBlocks <= 1); + if (!tsdbQueryFromSMA) { + SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); - SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); + if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno)); + return -1; + } - if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno)); - return -1; - } + size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); + if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1; - size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); - if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1; + int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size); + if (nread < 0) { + tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, + size); + return -1; + } - int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size); - if (nread < 0) { - tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size); - return -1; - } + if (nread < size) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu + " read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread); + return -1; + } - if (nread < size) { - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu - " read bytes: %" PRId64, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread); - return -1; - } + if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 + " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size); + return -1; + } + } else { +#ifdef __TD_6117__ + SDFile *pDFileAggr = TSDB_READ_AGGR_FILE(pReadh); - if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) { - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size); - return -1; + if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, + tstrerror(terrno)); + return -1; + } + + size_t sizeAggr = TSDB_BLOCK_AGGR_SIZE(pBlock->numOfCols); + if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1; + + int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr); + if (nreadAggr < 0) { + tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno), + (int64_t)pBlock->aggrOffset, sizeAggr); + return -1; + } + + if (nreadAggr < sizeAggr) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu + " read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr, + nreadAggr); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 + " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr); + return -1; + } +#endif } return 0; @@ -370,29 +414,57 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { } void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) { - SBlockData *pBlockData = pReadh->pBlkData; + if (!tsdbQueryFromSMA) { + SBlockData *pBlockData = pReadh->pBlkData; - for (int i = 0, j = 0; i < numOfCols;) { - if (j >= pBlockData->numOfCols) { - pStatis[i].numOfNull = -1; - i++; - continue; + for (int i = 0, j = 0; i < numOfCols;) { + if (j >= pBlockData->numOfCols) { + pStatis[i].numOfNull = -1; + i++; + continue; + } + + if (pStatis[i].colId == pBlockData->cols[j].colId) { + pStatis[i].sum = pBlockData->cols[j].sum; + pStatis[i].max = pBlockData->cols[j].max; + pStatis[i].min = pBlockData->cols[j].min; + pStatis[i].maxIndex = pBlockData->cols[j].maxIndex; + pStatis[i].minIndex = pBlockData->cols[j].minIndex; + pStatis[i].numOfNull = pBlockData->cols[j].numOfNull; + i++; + j++; + } else if (pStatis[i].colId < pBlockData->cols[j].colId) { + pStatis[i].numOfNull = -1; + i++; + } else { + j++; + } } + } else { + SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData; - if (pStatis[i].colId == pBlockData->cols[j].colId) { - pStatis[i].sum = pBlockData->cols[j].sum; - pStatis[i].max = pBlockData->cols[j].max; - pStatis[i].min = pBlockData->cols[j].min; - pStatis[i].maxIndex = pBlockData->cols[j].maxIndex; - pStatis[i].minIndex = pBlockData->cols[j].minIndex; - pStatis[i].numOfNull = pBlockData->cols[j].numOfNull; - i++; - j++; - } else if (pStatis[i].colId < pBlockData->cols[j].colId) { - pStatis[i].numOfNull = -1; - i++; - } else { - j++; + for (int i = 0, j = 0; i < numOfCols;) { + if (j >= pAggrBlkData->numOfCols) { + pStatis[i].numOfNull = -1; + i++; + continue; + } + + if (pStatis[i].colId == pAggrBlkData->cols[j].colId) { + pStatis[i].sum = pAggrBlkData->cols[j].sum; + pStatis[i].max = pAggrBlkData->cols[j].max; + pStatis[i].min = pAggrBlkData->cols[j].min; + pStatis[i].maxIndex = pAggrBlkData->cols[j].maxIndex; + pStatis[i].minIndex = pAggrBlkData->cols[j].minIndex; + pStatis[i].numOfNull = pAggrBlkData->cols[j].numOfNull; + i++; + j++; + } else if (pStatis[i].colId < pAggrBlkData->cols[j].colId) { + pStatis[i].numOfNull = -1; + i++; + } else { + j++; + } } } } @@ -653,7 +725,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc return -1; } - if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, + if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pRBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), pBlockCol->colId, offset); -- GitLab