提交 cabc2177 编写于 作者: C Cary Xu

[TD-6117]<feature>:Multi-level aggregate query optimization

上级 3b384d82
......@@ -110,6 +110,10 @@ extern int8_t tsCacheLastRow;
//tsdb
extern bool tsdbForceKeepFile;
#ifdef __TD_6117__
extern bool tsdbQueryFromSMA;
#endif
// balance
extern int8_t tsEnableBalance;
extern int8_t tsAlternativeRole;
......
......@@ -156,6 +156,10 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO;
// For backward compatibility
bool tsdbForceKeepFile = false;
#ifdef __TD_6117__
bool tsdbQueryFromSMA = true;
#endif
// balance
int8_t tsEnableBalance = 1;
int8_t tsAlternativeRole = 0;
......
......@@ -42,6 +42,11 @@ int32_t main(int32_t argc, char *argv[]) {
}
} else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1;
}
#ifdef __TD_6117__
else if (strcmp(argv[i], "--disable-query-from-sma") == 0) {
tsdbQueryFromSMA = false;
#endif
} else if (strcmp(argv[i], "--force-keep-file") == 0) {
tsdbForceKeepFile = true;
} else if (strcmp(argv[i], "--compact-mnode-wal") == 0) {
......
......@@ -447,6 +447,8 @@ typedef enum {
TD_ROW_PARTIAL_UPDATE = 2
} TDUpdateConfig;
#define __TD_6117__
extern char *qtypeStr[];
#ifdef __cplusplus
......
......@@ -38,8 +38,8 @@ void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf);
int tsdbApplyRtn(STsdbRepo *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
......
......@@ -16,7 +16,13 @@
#ifndef _TD_TSDB_FS_H_
#define _TD_TSDB_FS_H_
#define TSDB_FS_VERSION 0
/**
* The fileset .head/.data/.last/.sma use the same TSDB_FS_VERSION.
* 0 - original format before 2021.08.25 // TODO update date 2021.08.25 to release version.
* 1 - extract aggregation block data from .data file and save to separated .sma file since 2021.08.25 // TODO update
* date to release version.
*/
#define TSDB_FS_VERSION 1
// ================== CURRENT file header info
typedef struct {
......
......@@ -38,7 +38,7 @@
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_SMA, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
// =============== SMFile
typedef struct {
......
......@@ -42,6 +42,11 @@ typedef struct {
int32_t numOfRows : 24;
int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
#ifdef __TD_6117__
int64_t hasAggr : 1;
int64_t aggrOffset : 63;
int32_t aggrLen;
#endif
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
......@@ -70,6 +75,18 @@ typedef struct {
char padding[1];
} SBlockCol;
typedef struct {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
uint8_t type;
char reserved[15]; // Adjust the size of reserved array whenever adding new field of SAggrBlkCol.
} SAggrBlkCol;
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
......@@ -88,6 +105,12 @@ typedef struct {
uint64_t uid; // For recovery usage
SBlockCol cols[];
} SBlockData;
typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
SAggrBlkCol cols[];
} SAggrBlkData;
struct SReadH {
STsdbRepo * pRepo;
......@@ -98,9 +121,13 @@ struct SReadH {
int cidx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData; // Block info
#ifdef __TD_6117__
SAggrBlkData *pAggrBlkData; // Block info
#endif
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pRBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
......@@ -110,10 +137,13 @@ struct SReadH {
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_AGGR_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMA)
#define TSDB_READ_BUF(rh) ((rh)->pRBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_AGGR_SIZE(ncols) (sizeof(SAggrBlkData) + sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM))
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
......
......@@ -35,6 +35,7 @@ typedef struct {
SDFileSet wSet;
bool isDFileSame;
bool isLFileSame;
bool isSmaFileSame;
TSKEY minKey;
TSKEY maxKey;
SArray * aBlkIdx; // SBlockIdx array
......@@ -51,8 +52,10 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_AGGR_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMA)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
......@@ -912,7 +915,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) {
} else {
int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision));
if (fid == TSDB_IVLD_FID || fid > tfid) {
fid = tfid;
fid = tfid; // find the least fid
}
}
}
......@@ -1053,11 +1056,12 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) {
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
SAggrBlkData *pAggrBlkData = NULL;
int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
......@@ -1069,24 +1073,38 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
}
pBlockData = (SBlockData *)(*ppBuf);
if (tsdbMakeRoom(ppExBuf, TSDB_BLOCK_AGGR_SIZE(pDataCols->numOfCols)) < 0) {
return -1;
}
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
SAggrBlkCol *pAggrBlkCol = pAggrBlkData->cols + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pBlockCol, 0, sizeof(*pBlockCol));
memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol));
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
pAggrBlkCol->colId = pDataCol->colId;
pAggrBlkCol->type = pDataCol->type;
if (tDataTypes[pDataCol->type].statisFunc) {
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max),
&(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
&(pAggrBlkCol->numOfNull));
}
nColsNotAllNull++;
}
......@@ -1099,12 +1117,15 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t lsize = tsize;
int32_t keyLen = 0;
int32_t tsizeAggr = TSDB_BLOCK_AGGR_SIZE(nColsNotAllNull);
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + tcol;
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol * pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
......@@ -1165,6 +1186,20 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
return -1;
}
#ifdef __TD_6117__
pAggrBlkData->delimiter = TSDB_FILE_DELIMITER;
pAggrBlkData->uid = TABLE_UID(pTable);
pAggrBlkData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
// Write the whole block to file
if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) {
return -1;
}
#endif
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
......@@ -1176,6 +1211,11 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
#ifdef __TD_6117__
pBlock->hasAggr = 1;
pBlock->aggrOffset = offsetAggr;
pBlock->aggrLen = tsizeAggr;
#endif
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
......@@ -1187,9 +1227,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, TSDB_COMMIT_AGGR_FILE(pCommith), pDataCols, pBlock, isLast,
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
}
......@@ -1611,6 +1651,23 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
}
// TSDB_FILE_SMA
SDFile *pRSmaf = TSDB_READ_AGGR_FILE(&(pCommith->readh));
SDFile *pWSmaf = TSDB_COMMIT_AGGR_FILE(pCommith);
tsdbInitDFileEx(pWSmaf, pRSmaf);
if (tsdbOpenDFile(pWSmaf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
pCommith->isSmaFileSame = true;
}
return 0;
......
......@@ -37,8 +37,10 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_AGGR_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMA)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
#define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh))
static int tsdbAsyncCompact(STsdbRepo *pRepo);
static void tsdbStartCompact(STsdbRepo *pRepo);
......@@ -56,7 +58,7 @@ static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet);
static void tsdbCompactFSetEnd(SCompactH *pComph);
static int tsdbCompactFSetImpl(SCompactH *pComph);
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf);
void **ppCBuf, void **ppExBuf);
enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT};
int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); }
......@@ -421,6 +423,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
SBlockIdx blkIdx;
void ** ppBuf = &(TSDB_COMPACT_BUF(pComph));
void ** ppCBuf = &(TSDB_COMPACT_COMP_BUF(pComph));
void ** ppExBuf = &(TSDB_COMPACT_EXBUF(pComph));
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pComph->aBlkIdx);
......@@ -451,7 +454,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
// Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
if (pComph->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf) < 0) {
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
} else {
......@@ -467,7 +470,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
break;
}
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
tdResetDataCols(pComph->pDataCols);
......@@ -476,7 +479,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
if (pComph->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
......@@ -499,7 +502,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf) {
void **ppCBuf, void **ppExBuf) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
......@@ -516,7 +519,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, pDataCols, &block, isLast, true, ppBuf, ppCBuf) < 0) {
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, TSDB_COMPACT_AGGR_FILE(pComph), pDataCols, &block, isLast, true,
ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
......@@ -526,5 +530,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
return 0;
}
}
......@@ -19,6 +19,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST
"sma", // TSDB_FILE_SMA(Small Materialized Aggregation)
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
};
......
......@@ -61,9 +61,11 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
void tsdbDestroyReadH(SReadH *pReadh) {
if (pReadh == NULL) return;
#ifdef __TD_6117__
pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
#endif
pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
pReadh->pBuf = taosTZfree(pReadh->pBuf);
pReadh->pRBuf = taosTZfree(pReadh->pRBuf);
pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
......@@ -298,38 +300,80 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
if (!tsdbQueryFromSMA) {
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
if (nread < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset,
size);
return -1;
}
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
if (nread < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
return -1;
}
if (nread < size) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
return -1;
}
if (nread < size) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64
" len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
} else {
#ifdef __TD_6117__
SDFile *pDFileAggr = TSDB_READ_AGGR_FILE(pReadh);
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset,
tstrerror(terrno));
return -1;
}
size_t sizeAggr = TSDB_BLOCK_AGGR_SIZE(pBlock->numOfCols);
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(int64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64
" len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
#endif
}
return 0;
......@@ -370,29 +414,57 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
}
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
SBlockData *pBlockData = pReadh->pBlkData;
if (!tsdbQueryFromSMA) {
SBlockData *pBlockData = pReadh->pBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlockData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
continue;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlockData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
continue;
}
if (pStatis[i].colId == pBlockData->cols[j].colId) {
pStatis[i].sum = pBlockData->cols[j].sum;
pStatis[i].max = pBlockData->cols[j].max;
pStatis[i].min = pBlockData->cols[j].min;
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
i++;
j++;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
} else {
j++;
}
}
} else {
SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;
if (pStatis[i].colId == pBlockData->cols[j].colId) {
pStatis[i].sum = pBlockData->cols[j].sum;
pStatis[i].max = pBlockData->cols[j].max;
pStatis[i].min = pBlockData->cols[j].min;
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
i++;
j++;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
} else {
j++;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pAggrBlkData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
continue;
}
if (pStatis[i].colId == pAggrBlkData->cols[j].colId) {
pStatis[i].sum = pAggrBlkData->cols[j].sum;
pStatis[i].max = pAggrBlkData->cols[j].max;
pStatis[i].min = pAggrBlkData->cols[j].min;
pStatis[i].maxIndex = pAggrBlkData->cols[j].maxIndex;
pStatis[i].minIndex = pAggrBlkData->cols[j].minIndex;
pStatis[i].numOfNull = pAggrBlkData->cols[j].numOfNull;
i++;
j++;
} else if (pStatis[i].colId < pAggrBlkData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
} else {
j++;
}
}
}
}
......@@ -653,7 +725,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return -1;
}
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pRBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
pBlockCol->colId, offset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册