未验证 提交 af4532dc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #20476 from taosdata/fix/TD-23163

enh(tsdb/cache): skip invalid datablock directly when traversing fs with last
...@@ -596,7 +596,8 @@ typedef struct { ...@@ -596,7 +596,8 @@ typedef struct {
int64_t lastTs; int64_t lastTs;
} SFSLastNextRowIter; } SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
...@@ -740,7 +741,8 @@ typedef struct SFSNextRowIter { ...@@ -740,7 +741,8 @@ typedef struct SFSNextRowIter {
int64_t lastTs; int64_t lastTs;
} SFSNextRowIter; } SFSNextRowIter;
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SFSNextRowIter *state = (SFSNextRowIter *)iter; SFSNextRowIter *state = (SFSNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
...@@ -828,8 +830,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -828,8 +830,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
} }
} }
case SFSNEXTROW_BLOCKDATA: case SFSNEXTROW_BLOCKDATA:
_next_datablock:
if (state->iBlock >= 0) { if (state->iBlock >= 0) {
SDataBlk block = {0}; SDataBlk block = {0};
bool skipBlock = true;
int inputColIndex = 0;
tDataBlkReset(&block); tDataBlkReset(&block);
tBlockDataReset(state->pBlockData); tBlockDataReset(state->pBlockData);
...@@ -854,6 +859,42 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie ...@@ -854,6 +859,42 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
if (code) goto _err; if (code) goto _err;
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
SColData *pColData = &state->pBlockData->aColData[colIndex];
int16_t cid = pColData->cid;
if (inputColIndex < nCols && cid == aCols[inputColIndex++]) {
if (isLast && pColData->numOfValue != 0) {
skipBlock = false;
break;
} else if (pColData->numOfNone != pColData->nVal) {
skipBlock = false;
break;
}
}
}
if (skipBlock) {
if (--state->iBlock < 0) {
tsdbDataFReaderClose(state->pDataFReader);
*state->pDataFReader = NULL;
// resetLastBlockLoadInfo(state->pLoadInfo);
if (state->aBlockIdx) {
// taosArrayDestroy(state->aBlockIdx);
tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
state->aBlockIdxHandle = NULL;
state->aBlockIdx = NULL;
}
state->state = SFSNEXTROW_FILESET;
goto _next_fileset;
} else {
goto _next_datablock;
}
}
state->nRow = state->blockData.nRow; state->nRow = state->blockData.nRow;
state->iRow = state->nRow - 1; state->iRow = state->nRow - 1;
...@@ -960,7 +1001,8 @@ typedef struct SMemNextRowIter { ...@@ -960,7 +1001,8 @@ typedef struct SMemNextRowIter {
// TSDBROW *curRow; // TSDBROW *curRow;
} SMemNextRowIter; } SMemNextRowIter;
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SMemNextRowIter *state = (SMemNextRowIter *)iter; SMemNextRowIter *state = (SMemNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
*pIgnoreEarlierTs = false; *pIgnoreEarlierTs = false;
...@@ -1072,7 +1114,8 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { ...@@ -1072,7 +1114,8 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
return deleted; return deleted;
} }
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs); typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols);
typedef int32_t (*_next_row_clear_fn_t)(void *iter); typedef int32_t (*_next_row_clear_fn_t)(void *iter);
typedef struct { typedef struct {
...@@ -1222,12 +1265,14 @@ _err: ...@@ -1222,12 +1265,14 @@ _err:
} }
// iterate next row non deleted backward ts, version (from high to low) // iterate next row non deleted backward ts, version (from high to low)
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
int16_t *aCols, int nCols) {
int code = 0; int code = 0;
for (;;) { for (;;) {
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
if (pIter->input[i].next && !pIter->input[i].stop) { if (pIter->input[i].next && !pIter->input[i].stop) {
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs); code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs,
isLast, aCols, nCols);
if (code) goto _err; if (code) goto _err;
if (pIter->input[i].pRow == NULL) { if (pIter->input[i].pRow == NULL) {
...@@ -1358,7 +1403,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo ...@@ -1358,7 +1403,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs); nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, NULL, 0);
if (!pRow) { if (!pRow) {
break; break;
...@@ -1488,11 +1533,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach ...@@ -1488,11 +1533,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
bool ignoreEarlierTs = false; bool ignoreEarlierTs = false;
SArray *pColArray = NULL; SArray *pColArray = NULL;
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
int16_t nCols = nLastCol;
int32_t code = initLastColArray(pTSchema, &pColArray); int32_t code = initLastColArray(pTSchema, &pColArray);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
if (NULL == aColArray) {
taosArrayDestroy(pColArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
TSKEY lastRowTs = TSKEY_MAX; TSKEY lastRowTs = TSKEY_MAX;
...@@ -1502,7 +1554,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach ...@@ -1502,7 +1554,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs); nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
if (!pRow) { if (!pRow) {
break; break;
...@@ -1547,9 +1599,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach ...@@ -1547,9 +1599,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
} }
if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { if (!COL_VAL_IS_VALUE(pColVal)) {
noneCol = iCol; taosArrayPush(aColArray, &pColVal->cid);
setNoneCol = true; if (!setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
} }
} }
if (!setNoneCol) { if (!setNoneCol) {
...@@ -1590,6 +1645,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach ...@@ -1590,6 +1645,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
} }
taosArraySet(pColArray, iCol, &lastCol); taosArraySet(pColArray, iCol, &lastCol);
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
taosArrayRemove(aColArray, aColIndex);
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
noneCol = iCol; noneCol = iCol;
setNoneCol = true; setNoneCol = true;
...@@ -1613,6 +1670,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach ...@@ -1613,6 +1670,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
//} //}
nextRowIterClose(&iter); nextRowIterClose(&iter);
taosArrayDestroy(aColArray);
// taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
return code; return code;
...@@ -1621,6 +1679,7 @@ _err: ...@@ -1621,6 +1679,7 @@ _err:
// taosMemoryFreeClear(pTSchema); // taosMemoryFreeClear(pTSchema);
*ppLastArray = NULL; *ppLastArray = NULL;
taosArrayDestroy(pColArray); taosArrayDestroy(pColArray);
taosArrayDestroy(aColArray);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册