提交 5e92bfe2 编写于 作者: C Cary Xu

SBlock/SAggrBlock compatibility

上级 bbe45c1b
......@@ -110,6 +110,7 @@ extern int8_t tsCacheLastRow;
//tsdb
extern bool tsdbForceKeepFile;
extern bool tsdbEnableUpgradeFS;
#ifdef __TD_6117__
extern bool tsdbQueryFromSMA;
......
......@@ -156,6 +156,7 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO;
// tsdb config
// For backward compatibility
bool tsdbForceKeepFile = false;
bool tsdbEnableUpgradeFS = false;
#ifdef __TD_6117__
bool tsdbQueryFromSMA = true;
......
......@@ -42,11 +42,10 @@ int32_t main(int32_t argc, char *argv[]) {
}
} else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1;
}
#ifdef __TD_6117__
else if (strcmp(argv[i], "--disable-query-from-sma") == 0) {
} else if (strcmp(argv[i], "--enable-upgrade-fs") == 0) {
tsdbEnableUpgradeFS = false;
} else if (strcmp(argv[i], "--disable-query-from-sma") == 0) {
tsdbQueryFromSMA = false;
#endif
} else if (strcmp(argv[i], "--force-keep-file") == 0) {
tsdbForceKeepFile = true;
} else if (strcmp(argv[i], "--compact-mnode-wal") == 0) {
......
......@@ -131,9 +131,10 @@ typedef struct {
char padding[1];
} SBlockColV1;
#define SBlockColBase SBlockColV0 // base SBlockCol definition
#define SBlockCol SBlockColV1 // latest SBlockCol definition
#define SBlockColV(blkVer) SBlockColV##blkVer
typedef struct {
int16_t colId;
int16_t maxIndex;
......@@ -142,7 +143,11 @@ typedef struct {
int64_t sum;
int64_t max;
int64_t min;
} SAggrBlkCol;
} SAggrBlkColV1;
#define SAggrBlkCol SAggrBlkColV1 // latest SAggrBlkCol definition
#define SAggrBlkColV(blkVer) SAggrBlkColV##blkVer
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
......@@ -160,12 +165,12 @@ typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
SBlockCol cols[];
SBlockCol cols[]; // latest definition
} SBlockData;
typedef struct {
int32_t delimiter; // For recovery usage
// int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
// uint64_t uid; // For recovery usage
SAggrBlkCol cols[];
} SAggrBlkData;
......@@ -197,8 +202,32 @@ struct SReadH {
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_AGGR_SIZE(ncols) (sizeof(SAggrBlkData) + sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV(blkVer)) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
case TSDB_SBLK_VER_1:
default:
return TSDB_BLOCK_STATIS_SIZE(nCols, 1);
}
}
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) \
(sizeof(SAggrBlkData) + sizeof(SAggrBlkColV(blkVer)) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
case TSDB_SBLK_VER_0:
ASSERT(false);
return 0;
case TSDB_SBLK_VER_1:
default:
return TSDB_BLOCK_AGGR_SIZE(nCols, 1);
}
}
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo);
void tsdbDestroyReadH(SReadH *pReadh);
......@@ -212,7 +241,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols);
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
void * pBuf = *ppBuf;
......
......@@ -1068,12 +1068,12 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
// Make buffer space
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pBlockData = (SBlockData *)(*ppBuf);
if (tsdbMakeRoom(ppExBuf, TSDB_BLOCK_AGGR_SIZE(pDataCols->numOfCols)) < 0) {
if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
return -1;
}
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
......@@ -1082,7 +1082,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
SBlockCol * pBlockCol = pBlockData->cols + nColsNotAllNull;
SAggrBlkCol *pAggrBlkCol = pAggrBlkData->cols + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
......@@ -1113,11 +1113,11 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
// Compress the data if neccessary
int tcol = 0; // counter of not all NULL and written columns
uint32_t toffset = 0;
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t tsize = tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
int32_t lsize = tsize;
int32_t keyLen = 0;
int32_t tsizeAggr = TSDB_BLOCK_AGGR_SIZE(nColsNotAllNull);
int32_t tsizeAggr = tsdbBlockAggrSize(nColsNotAllNull, SBlockVerLatest);
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
// All not NULL columns finish
......@@ -1186,8 +1186,8 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
}
#ifdef __TD_6117__
pAggrBlkData->delimiter = TSDB_FILE_DELIMITER;
pAggrBlkData->uid = TABLE_UID(pTable);
// pAggrBlkData->delimiter = TSDB_FILE_DELIMITER;
// pAggrBlkData->uid = TABLE_UID(pTable);
pAggrBlkData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
......@@ -1212,6 +1212,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
pBlock->keyLast = dataColsKeyLast(pDataCols);
#ifdef __TD_6117__
pBlock->hasAggr = 1;
pBlock->blkVer = SBlockVerLatest;
pBlock->aggrOffset = offsetAggr;
pBlock->aggrLen = tsizeAggr;
#endif
......
......@@ -341,8 +341,7 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
return -1;
}
// add switch to control
if (tsdbRefactorFS(pRepo) < 0) {
if (tsdbEnableUpgradeFS && tsdbRefactorFS(pRepo) < 0) {
tsdbError("vgId:%d failed to refactor FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
......
......@@ -708,7 +708,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
tsdbLoadBlockStatis(pReadh, pBlock);
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
}
......
......@@ -3379,7 +3379,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle->statis[i].colId = colIds[i];
}
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
......
......@@ -417,18 +417,15 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
return 0;
}
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
static int tsdbLoadBlockStatisFromData(SReadH *pReadh, SBlock *pBlock) {
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
size_t size = tsdbBlockStatisSize(pBlock->numOfCols, pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
......@@ -452,50 +449,58 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
return 0;
}
#ifdef __TD_6117__
if (tsdbQueryFromSMA) {
SDFile *pDFileAggr = TSDB_READ_AGGR_FILE(pReadh);
static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) {
if(!pBlock->hasAggr) {
return 0;
}
SDFile *pDFileAggr = TSDB_READ_AGGR_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset,
tstrerror(terrno));
return -1;
}
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset,
tstrerror(terrno));
return -1;
}
size_t sizeAggr = TSDB_BLOCK_AGGR_SIZE(pBlock->numOfCols);
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, pBlock->blkVer);
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(int64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
if (nreadAggr < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
(int64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
return -1;
}
if (nreadAggr < sizeAggr) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr,
nreadAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64
" len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (int64_t)pBlock->aggrOffset, sizeAggr);
return -1;
}
return 0;
}
#endif
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks <= 1);
return 0;
if (pBlock->blkVer == TSDB_SBLK_VER_0) {
return tsdbLoadBlockStatisFromData(pReadh, pBlock);
}
return tsdbLoadBlockStatisFromAggr(pReadh, pBlock);
}
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
......@@ -532,10 +537,8 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
return buf;
}
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
if (!tsdbQueryFromSMA) {
ASSERT(0); // remove Statis from .data/.last
#if 0
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) {
if (pBlock->blkVer == TSDB_SBLK_VER_0) {
SBlockData *pBlockData = pReadh->pBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
......@@ -544,25 +547,24 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
i++;
continue;
}
if (pStatis[i].colId == pBlockData->cols[j].colId) {
pStatis[i].sum = pBlockData->cols[j].sum;
pStatis[i].max = pBlockData->cols[j].max;
pStatis[i].min = pBlockData->cols[j].min;
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
SBlockColV0 *pSBlkCol = ((SBlockColV0 *)(pBlockData->cols)) + j;
if (pStatis[i].colId == pSBlkCol->colId) {
pStatis[i].sum = pSBlkCol->sum;
pStatis[i].max = pSBlkCol->max;
pStatis[i].min = pSBlkCol->min;
pStatis[i].maxIndex = pSBlkCol->maxIndex;
pStatis[i].minIndex = pSBlkCol->minIndex;
pStatis[i].numOfNull = pSBlkCol->numOfNull;
i++;
j++;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
} else if (pStatis[i].colId < pSBlkCol->colId) {
pStatis[i].numOfNull = -1;
i++;
} else {
j++;
}
}
#endif
} else {
} else if (pBlock->hasAggr) {
SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;
for (int i = 0, j = 0; i < numOfCols;) {
......@@ -571,17 +573,17 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
i++;
continue;
}
if (pStatis[i].colId == pAggrBlkData->cols[j].colId) {
pStatis[i].sum = pAggrBlkData->cols[j].sum;
pStatis[i].max = pAggrBlkData->cols[j].max;
pStatis[i].min = pAggrBlkData->cols[j].min;
pStatis[i].maxIndex = pAggrBlkData->cols[j].maxIndex;
pStatis[i].minIndex = pAggrBlkData->cols[j].minIndex;
pStatis[i].numOfNull = pAggrBlkData->cols[j].numOfNull;
SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData->cols)) + j;
if (pStatis[i].colId == pAggrBlkCol->colId) {
pStatis[i].sum = pAggrBlkCol->sum;
pStatis[i].max = pAggrBlkCol->max;
pStatis[i].min = pAggrBlkCol->min;
pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
pStatis[i].minIndex = pAggrBlkCol->minIndex;
pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
i++;
j++;
} else if (pStatis[i].colId < pAggrBlkData->cols[j].colId) {
} else if (pStatis[i].colId < pAggrBlkCol->colId) {
pStatis[i].numOfNull = -1;
i++;
} else {
......@@ -637,7 +639,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return -1;
}
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols);
int32_t tsize = tsdbBlockStatisSize(pBlock->numOfCols, pBlock->blkVer);
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
......@@ -825,7 +827,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + tsdbGetBlockColOffset(pBlockCol);
int64_t offset =
pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, pBlock->blkVer) + tsdbGetBlockColOffset(pBlockCol);
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册