未验证 提交 db0c47cc 编写于 作者: C Cary Xu 提交者: GitHub

Feature/td 11463 3.0 - block-wise SMA (#10510)

* Block-wise SMA extraction

* refactor the SBlock

* add method tsdbLoadBlockOffset

* set method tsdbLoadBlockOffset static

* refactor

* trigger CI

* minor change

* trigger CI
上级 06bbae7d
......@@ -51,6 +51,10 @@ typedef enum {
} ECheckItemType;
typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig;
typedef enum {
TSDB_STATIS_OK = 0, // statis part exist and load successfully
TSDB_STATIS_NONE = 1, // statis part not exist
} ETsdbStatisStatus;
extern char *qtypeStr[];
......
......@@ -27,12 +27,12 @@ extern "C" {
typedef struct SDataStatis {
int16_t colId;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SDataStatis;
typedef struct STable {
......@@ -53,6 +53,8 @@ typedef struct STsdb STsdb;
typedef struct STsdbCfg {
int8_t precision;
int8_t update;
int8_t compression;
uint64_t lruCacheSize;
int32_t daysPerFile;
int32_t minRowsPerFileBlock;
......@@ -60,8 +62,6 @@ typedef struct STsdbCfg {
int32_t keep;
int32_t keep1;
int32_t keep2;
int8_t update;
int8_t compression;
} STsdbCfg;
// query condition to build multi-table data block iterator
......
......@@ -18,8 +18,6 @@
#include "tsdbFile.h"
#define TSDB_FS_VERSION 0
// ================== TSDB global config
extern bool tsdbForceKeepFile;
......
......@@ -44,7 +44,37 @@
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
typedef enum {
TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data
TSDB_FILE_LAST, // .last
TSDB_FILE_SMAD, // .smad(Block-wise SMA)
TSDB_FILE_SMAL, // .smal(Block-wise SMA)
TSDB_FILE_MAX, //
TSDB_FILE_META, // meta
TSDB_FILE_TSMA, // .tsma.${sma_index_name}, Time-range-wise SMA
TSDB_FILE_RSMA, // .rsma.${sma_index_name}, Time-range-wise Rollup SMA
} TSDB_FILE_T;
typedef enum {
TSDB_FS_VER_0 = 0,
TSDB_FS_VER_MAX,
} ETsdbFsVer;
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
switch (fType) {
case TSDB_FILE_HEAD: // .head
case TSDB_FILE_DATA: // .data
case TSDB_FILE_LAST: // .last
case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
default:
return TSDB_LATEST_FVER;
}
}
#if 0
// =============== SMFile
......@@ -169,6 +199,7 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby
// =============== SDFile
typedef struct {
uint32_t magic;
uint32_t fver;
uint32_t len;
uint32_t totalBlocks;
uint32_t totalSubBlocks;
......@@ -188,7 +219,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
......@@ -292,12 +323,18 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
typedef struct {
int fid;
int state;
SDFile files[TSDB_FILE_MAX];
int fid;
int8_t state; // -128~127
uint8_t ver; // 0~255, DFileSet version
uint16_t reserve;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
#define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_STATE(s) ((s)->state)
#define TSDB_FSET_VER(s) ((s)->ver)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
......
......@@ -36,6 +36,7 @@ typedef struct {
TSKEY maxKey;
} SBlockIdx;
#ifdef TD_REFACTOR_3
typedef struct {
int64_t last : 1;
int64_t offset : 63;
......@@ -49,22 +50,35 @@ typedef struct {
TSKEY keyLast;
} SBlock;
#else
typedef enum {
TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_MAX,
} ESBlockVer;
#define SBlockVerLatest TSDB_SBLK_VER_0
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
uint8_t reserve0;
uint8_t last : 1;
uint8_t blkVer : 7;
uint8_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
uint32_t len : 32; // data block length
uint32_t len; // data block length
uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t reserve1 : 8;
uint64_t blkVer : 8;
uint64_t aggrOffset : 56;
uint32_t reserve : 8;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int64_t offset;
uint64_t aggrStat : 1;
uint64_t aggrOffset : 63;
TSKEY keyFirst;
TSKEY keyLast;
} SBlock_3;
} SBlockV0;
#define SBlock SBlockV0 // latest SBlock definition
#endif
typedef struct {
int32_t delimiter; // For recovery usage
......@@ -73,6 +87,7 @@ typedef struct {
SBlock blocks[];
} SBlockInfo;
#ifdef TD_REFACTOR_3
typedef struct {
int16_t colId;
uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
......@@ -89,17 +104,50 @@ typedef struct {
uint8_t offsetH;
char padding[1];
} SBlockCol;
#else
typedef struct {
int16_t colId;
uint8_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint8_t reserve : 7;
uint8_t type;
int32_t len;
uint32_t offset;
} SBlockColV0;
#define SBlockCol SBlockColV0 // latest SBlockCol definition
typedef struct {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SAggrBlkColV0;
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
#endif
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
#ifdef TD_REFACTOR_3
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
pBlockCol->offsetH = (uint8_t)(offset >> 24);
#else
pBlockCol->offset = offset;
#endif
}
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
#ifdef TD_REFACTOR_3
uint32_t offset1 = pBlockCol->offset;
uint32_t offset2 = pBlockCol->offsetH;
return (offset1 | (offset2 << 24));
#else
return pBlockCol->offset;
#endif
}
typedef struct {
......@@ -109,31 +157,57 @@ typedef struct {
SBlockCol cols[];
} SBlockData;
typedef void SAggrBlkData; // SBlockCol cols[];
struct SReadH {
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData; // Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
STsdb * pRepo;
SDFileSet rSet; // FSET to read
SArray * aBlkIdx; // SBlockIdx array
STable * pTable; // table to read
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
int cidx;
SBlockInfo * pBlkInfo;
SBlockData * pBlkData; // Block info
SAggrBlkData *pAggrBlkData; // Aggregate Block info
SDataCols * pDCols[2];
void * pBuf; // buffer
void * pCBuf; // compression buffer
void * pExBuf; // extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
default:
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
}
}
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
default:
return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
}
}
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
......@@ -147,7 +221,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void * pBuf = *ppBuf;
......
......@@ -50,8 +50,11 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
......@@ -509,7 +512,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
......@@ -560,7 +563,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
......@@ -572,6 +575,74 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
}
// TSDB_FILE_SMAD
SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh));
SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith);
if (access(TSDB_FILE_FULL_NAME(pRSmadF), F_OK) != 0) {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF));
tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);
if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFileEx(pWSmadF, pRSmadF);
if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
// TSDB_FILE_SMAL
SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);
if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) {
tsdbInitDFileEx(pWSmalF, pRSmalF);
if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);
if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
return 0;
......@@ -1131,41 +1202,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData * pBlockData = NULL;
SAggrBlkData *pAggrBlkData = NULL;
int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
// Make buffer space
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pBlockData = (SBlockData *)(*ppBuf);
if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull;
SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pBlockCol, 0, sizeof(*pBlockCol));
memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol));
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
pAggrBlkCol->colId = pDataCol->colId;
if (tDataTypes[pDataCol->type].statisFunc) {
#if 0
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
if (pBlockCol->numOfNull == 0) {
#endif
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max),
&(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
&(pAggrBlkCol->numOfNull));
if (pAggrBlkCol->numOfNull == 0) {
TD_SET_COL_ROWS_NORM(pBlockCol);
} else {
TD_SET_COL_ROWS_MISC(pBlockCol);
......@@ -1181,13 +1268,14 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Compress the data if neccessary
int tcol = 0; // counter of not all NULL and written columns
uint32_t toffset = 0;
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
int32_t lsize = tsize;
uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest);
int32_t keyLen = 0;
int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite);
int32_t tBitmaps = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) {
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
......@@ -1248,7 +1336,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
if (ncol != 0) {
tsdbSetBlockColOffset(pBlockCol, toffset);
pBlockCol->len = flen;
tcol++;
++tcol;
} else {
keyLen = flen;
}
......@@ -1269,6 +1357,18 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
return -1;
}
uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0;
if (aggrStatus > 0) {
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
// Write the whole block to file
if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) {
return -1;
}
}
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
......@@ -1280,6 +1380,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
pBlock->aggrStat = aggrStatus;
pBlock->blkVer = SBlockVerLatest;
pBlock->aggrOffset = (uint64_t)offsetAggr;
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
......@@ -1291,9 +1394,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
......
......@@ -38,6 +38,8 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
......
......@@ -422,7 +422,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
return -1;
}
fsheader.version = TSDB_FS_VERSION;
fsheader.version = TSDB_LATEST_SFS_VER;
if (taosArrayGetSize(pStatus->df) == 0) {
fsheader.len = 0;
} else {
......@@ -697,7 +697,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
ptr = tsdbDecodeFSHeader(ptr, &fsheader);
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
if (fsheader.version != TSDB_FS_VERSION) {
if (fsheader.version != TSDB_LATEST_SFS_VER) {
// TODO: handle file version change
}
......
......@@ -19,8 +19,12 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST
"smad", // TSDB_FILE_SMAD
"smal", // TSDB_FILE_SMAL
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
"tsma", // TSDB_FILE_TSMA
"rsma", // TSDB_FILE_RSMA
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
......@@ -304,6 +308,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
......@@ -341,7 +346,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
}
static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
char *aname;
char *aname = NULL;
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = taosDecodeString(buf, &aname);
......@@ -352,7 +357,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return buf;
}
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
......@@ -382,6 +387,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
}
pDFile->info.size += TSDB_FILE_HEAD_SIZE;
pDFile->info.fver = tsdbGetDFSVersion(fType);
if (tsdbUpdateDFileHeader(pDFile) < 0) {
tsdbCloseDFile(pDFile);
......@@ -493,6 +499,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeFixedU32(buf, pInfo->magic);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
......@@ -505,6 +512,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
......@@ -562,8 +570,10 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
// ============== Operations on SDFileSet
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
pSet->fid = fid;
pSet->state = 0;
TSDB_FSET_FID(pSet) = fid;
TSDB_FSET_VER(pSet) = TSDB_LATEST_FSET_VER;
TSDB_FSET_STATE(pSet) = 0;
pSet->reserve = 0;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
......@@ -572,7 +582,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3
}
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
pSet->fid = pOSet->fid;
TSDB_FSET_FID(pSet) = TSDB_FSET_FID(pOSet);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
}
......@@ -581,7 +591,10 @@ void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
// state not included
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
tlen += taosEncodeFixedU16(buf, pSet->reserve);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -590,11 +603,11 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
}
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
TSDB_FSET_STATE(pSet) = 0;
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0;
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -604,7 +617,9 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pSet->fid);
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
tlen += taosEncodeFixedU16(buf, pSet->reserve);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -613,10 +628,10 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
}
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
buf = taosDecodeFixedI32(buf, &(fid));
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
......@@ -637,7 +652,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet);
return -1;
......
......@@ -821,9 +821,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
tsdbLoadBlockStatis(pReadh, pBlock);
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
loadStatisData = true;
if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
}
}
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
......
......@@ -3276,8 +3276,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
}
int64_t stime = taosGetTimestampUs();
if (tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock) < 0) {
int statisStatus = tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
if (statisStatus < TSDB_STATIS_OK) {
return terrno;
} else if (statisStatus > TSDB_STATIS_OK) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
int16_t* colIds = pHandle->defaultLoadColumn->pData;
......@@ -3288,7 +3292,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
pHandle->statis[i].colId = colIds[i];
}
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
......
......@@ -19,6 +19,7 @@
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int numOfBitmaps, int lenOfBitmaps, int maxPoints, char *buffer,
......@@ -63,10 +64,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
void tsdbDestroyReadH(SReadH *pReadh) {
if (pReadh == NULL) return;
pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
pReadh->pBuf = taosTZfree(pReadh->pBuf);
pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData);
pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
pReadh->cidx = 0;
......@@ -305,15 +308,58 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (!pBlock->aggrStat) {
return TSDB_STATIS_NONE;
}
SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset,
tstrerror(terrno));
return -1;
}
size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
return 0;
}
static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
......@@ -337,7 +383,6 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
return 0;
}
......@@ -375,13 +420,14 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
return buf;
}
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) {
#ifdef TD_REFACTOR_3
SBlockData *pBlockData = pReadh->pBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlockData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
++i;
continue;
}
......@@ -392,15 +438,45 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
i++;
j++;
++i;
++j;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
++i;
} else {
j++;
++j;
}
}
#else
if (pBlock->aggrStat) {
SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlock->numOfCols) {
pStatis[i].numOfNull = -1;
++i;
continue;
}
SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j;
if (pStatis[i].colId == pAggrBlkCol->colId) {
pStatis[i].sum = pAggrBlkCol->sum;
pStatis[i].max = pAggrBlkCol->max;
pStatis[i].min = pAggrBlkCol->min;
pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
pStatis[i].minIndex = pAggrBlkCol->minIndex;
pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
++i;
++j;
} else if (pStatis[i].colId < pAggrBlkCol->colId) {
pStatis[i].numOfNull = -1;
++i;
} else {
++j;
}
}
}
#endif
}
static void tsdbResetReadTable(SReadH *pReadh) {
......@@ -449,7 +525,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return -1;
}
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
......@@ -592,7 +668,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols);
// If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1;
if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
pDataCols->numOfRows = pBlock->numOfRows;
......@@ -686,7 +762,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + tsdbGetBlockColOffset(pBlockCol);
int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) +
tsdbGetBlockColOffset(pBlockCol);
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
......
......@@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册