未验证 提交 0142f8a6 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #12188 from taosdata/feature/TD-13066-3.0

feat: bitmap operation optimization
...@@ -325,9 +325,8 @@ typedef struct { ...@@ -325,9 +325,8 @@ typedef struct {
typedef struct { typedef struct {
int16_t colId; int16_t colId;
uint16_t type : 6; uint16_t type : 6;
uint16_t blen : 10; // bitmap length(TODO: full UT for the bitmap compress of various data input) uint16_t blen : 10; // 0 no bitmap if all rows are NORM, > 0 bitmap length
uint32_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows uint32_t len; // data length + bitmap length
uint32_t len : 31; // data length + bitmap length
uint32_t offset; uint32_t offset;
} SBlockColV0; } SBlockColV0;
......
...@@ -943,16 +943,16 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF ...@@ -943,16 +943,16 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
&(pAggrBlkCol->numOfNull)); &(pAggrBlkCol->numOfNull));
if (pAggrBlkCol->numOfNull == 0) { if (pAggrBlkCol->numOfNull == 0) {
TD_SET_COL_ROWS_NORM(pBlockCol); pBlockCol->blen = 0;
} else { } else {
TD_SET_COL_ROWS_MISC(pBlockCol); pBlockCol->blen = 1;
} }
++nColsOfBlockSma; ++nColsOfBlockSma;
} else if (tdIsBitmapBlkNorm(pDataCol->pBitmap, rowsToWrite, pDataCols->bitmapMode)) { } else if (tdIsBitmapBlkNorm(pDataCol->pBitmap, rowsToWrite, pDataCols->bitmapMode)) {
// check if all rows normal // check if all rows normal
TD_SET_COL_ROWS_NORM(pBlockCol); pBlockCol->blen = 0;
} else { } else {
TD_SET_COL_ROWS_MISC(pBlockCol); pBlockCol->blen = 1;
} }
++nColsNotAllNull; ++nColsNotAllNull;
...@@ -985,7 +985,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF ...@@ -985,7 +985,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
int32_t tBitmaps = 0; int32_t tBitmaps = 0;
int32_t tBitmapsLen = 0; int32_t tBitmapsLen = 0;
if ((ncol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) { if ((ncol != 0) && (pBlockCol->blen > 0)) {
tBitmaps = isSuper ? sBitmaps : nBitmaps; tBitmaps = isSuper ? sBitmaps : nBitmaps;
} }
#endif #endif
......
...@@ -1619,7 +1619,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1619,7 +1619,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
SCellVal sVal = {0}; SCellVal sVal = {0};
TSKEY rowKey = TSKEY_INITIAL_VAL; TSKEY rowKey = TSKEY_INITIAL_VAL;
int32_t nResult = 0; int32_t nResult = 0;
bool isMerge = true; int32_t mergeOption = 0; // 0 discard 1 overwrite 2 merge
// the schema version info is embeded in STSRow // the schema version info is embeded in STSRow
int32_t numOfColsOfRow1 = 0; int32_t numOfColsOfRow1 = 0;
...@@ -1715,12 +1715,18 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1715,12 +1715,18 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
rowKey = *(TSKEY*)sVal.val; rowKey = *(TSKEY*)sVal.val;
if (rowKey != *lastRowKey) { if (rowKey != *lastRowKey) {
isMerge = false; mergeOption = 1;
if (*lastRowKey != TSKEY_INITIAL_VAL) { if (*lastRowKey != TSKEY_INITIAL_VAL) {
++(*curRow); ++(*curRow);
} }
++nResult; ++nResult;
} else if (update){
mergeOption = 2;
} else {
mergeOption = 0;
break;
} }
*lastRowKey = rowKey; *lastRowKey = rowKey;
} }
} else { } else {
...@@ -1730,11 +1736,16 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1730,11 +1736,16 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal); tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
rowKey = *(TSKEY*)sVal.val; rowKey = *(TSKEY*)sVal.val;
if (rowKey != *lastRowKey) { if (rowKey != *lastRowKey) {
isMerge = false; mergeOption = 1;
if (*lastRowKey != TSKEY_INITIAL_VAL) { if (*lastRowKey != TSKEY_INITIAL_VAL) {
++(*curRow); ++(*curRow);
} }
++nResult; ++nResult;
} else if(update) {
mergeOption = 2;
} else {
mergeOption = 0;
break;
} }
*lastRowKey = rowKey; *lastRowKey = rowKey;
} else { } else {
...@@ -1754,7 +1765,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1754,7 +1765,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
colDataAppend(pColInfo, *curRow, NULL, true); colDataAppend(pColInfo, *curRow, NULL, true);
} else if (tdValTypeIsNone(sVal.valType)) { } else if (tdValTypeIsNone(sVal.valType)) {
// TODO: Set null if nothing append for this row // TODO: Set null if nothing append for this row
if (!isMerge) { if (mergeOption == 1) {
colDataAppend(pColInfo, *curRow, NULL, true); colDataAppend(pColInfo, *curRow, NULL, true);
} }
} else { } else {
...@@ -1769,14 +1780,14 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1769,14 +1780,14 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++k; ++k;
} }
} else { } else {
if (!isMerge) { if (mergeOption == 1) {
colDataAppend(pColInfo, *curRow, NULL, true); colDataAppend(pColInfo, *curRow, NULL, true);
} }
++i; ++i;
} }
} }
if (*lastRowKey != rowKey) { if (mergeOption == 1) {
while (i < numOfCols) { // the remain columns are all null data while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
colDataAppend(pColInfo, *curRow, NULL, true); colDataAppend(pColInfo, *curRow, NULL, true);
...@@ -2008,7 +2019,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2008,7 +2019,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, true, &lastRowKey); pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey);
// numOfRows += 1; // numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -2065,7 +2076,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2065,7 +2076,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, true, &lastRowKey); pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey);
// ++numOfRows; // ++numOfRows;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -2747,7 +2758,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -2747,7 +2758,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
rv = TD_ROW_SVER(row); rv = TD_ROW_SVER(row);
} }
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
NULL, true, &lastRowKey); NULL, pCfg->update, &lastRowKey);
if (numOfRows >= maxRowsToRead) { if (numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
......
...@@ -616,7 +616,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -616,7 +616,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
tcolId = pBlockCol->colId; tcolId = pBlockCol->colId;
toffset = tsdbGetBlockColOffset(pBlockCol); toffset = tsdbGetBlockColOffset(pBlockCol);
tlen = pBlockCol->len; tlen = pBlockCol->len;
pDataCol->bitmap = pBlockCol->bitmap; pDataCol->bitmap = pBlockCol->blen > 0 ? 1 : 0;
} else { } else {
ASSERT(pDataCol->colId == tcolId); ASSERT(pDataCol->colId == tcolId);
TD_SET_COL_ROWS_NORM(pDataCol); TD_SET_COL_ROWS_NORM(pDataCol);
...@@ -624,17 +624,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat ...@@ -624,17 +624,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
// int32_t tBitmaps = 0; // int32_t tBitmaps = 0;
int32_t tLenBitmap = 0; int32_t tLenBitmap = 0;
if ((dcol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) { if ((dcol != 0) && (pBlockCol->blen > 0)) {
tLenBitmap = nBitmaps; tLenBitmap = nBitmaps;
#if 0
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
tBitmaps = nBitmaps;
tLenBitmap = tBitmaps;
} else {
tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type];
}
#endif
} }
if (tcolId == pDataCol->colId) { if (tcolId == pDataCol->colId) {
...@@ -784,8 +775,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * ...@@ -784,8 +775,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
blockCol.colId = colId; blockCol.colId = colId;
TD_SET_COL_ROWS_NORM(&blockCol); // default is NORM for the primary key column blockCol.blen = 0; // default is NORM for the primary key column
blockCol.blen = 0;
blockCol.len = pBlock->keyLen; blockCol.len = pBlock->keyLen;
blockCol.type = pDataCol->type; blockCol.type = pDataCol->type;
blockCol.offset = TSDB_KEY_COL_OFFSET; blockCol.offset = TSDB_KEY_COL_OFFSET;
...@@ -815,7 +805,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * ...@@ -815,7 +805,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
ASSERT(pBlockCol->colId == pDataCol->colId); ASSERT(pBlockCol->colId == pDataCol->colId);
} }
// set the bitmap // set the bitmap
pDataCol->bitmap = pBlockCol->bitmap; pDataCol->bitmap = pBlockCol->blen > 0 ? 1 : 0;
if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1; if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
} }
...@@ -833,17 +823,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc ...@@ -833,17 +823,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
// int32_t tBitmaps = 0; // int32_t tBitmaps = 0;
int32_t tLenBitmap = 0; int32_t tLenBitmap = 0;
if (!TD_COL_ROWS_NORM(pBlockCol)) { if (pBlockCol->blen) {
tLenBitmap = nBitmaps; tLenBitmap = nBitmaps;
#if 0
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
tBitmaps = nBitmaps;
tLenBitmap = tBitmaps;
} else {
tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type];
}
#endif
} }
int tsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES; int tsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册