diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 244530314a3556d5359c145f44a381f1049e5aa1..786d1c36fbf17057e21a4a274ed41fd8eb01fc1c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -593,9 +593,10 @@ typedef struct { SMergeTree mergeTree; SMergeTree *pMergeTree; SSttBlockLoadInfo *pLoadInfo; + int64_t lastTs; } SFSLastNextRowIter; -static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { +static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; int32_t code = 0; @@ -641,15 +642,27 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { } state->state = SFSLASTNEXTROW_BLOCKROW; } - case SFSLASTNEXTROW_BLOCKROW: - state->row = tMergeTreeGetRow(&state->mergeTree); - *ppRow = &state->row; - bool hasVal = tMergeTreeNext(&state->mergeTree); + case SFSLASTNEXTROW_BLOCKROW: { + bool hasVal = false; + do { + state->row = tMergeTreeGetRow(&state->mergeTree); + *ppRow = &state->row; + hasVal = tMergeTreeNext(&state->mergeTree); + } while (TSDBROW_TS(&state->row) <= state->lastTs && hasVal); + + if (TSDBROW_TS(&state->row) <= state->lastTs) { + *pIgnoreEarlierTs = true; + state->state = SFSLASTNEXTROW_FILESET; + goto _next_fileset; + } + + *pIgnoreEarlierTs = false; if (!hasVal) { state->state = SFSLASTNEXTROW_FILESET; } return code; + } default: ASSERT(0); break; @@ -725,7 +738,7 @@ typedef struct SFSNextRowIter { int64_t lastTs; } SFSNextRowIter; -static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { +static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { SFSNextRowIter *state = (SFSNextRowIter *)iter; int32_t code = 0; @@ -821,8 +834,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); if (block.maxKey.ts <= state->lastTs) { + *pIgnoreEarlierTs = true; goto _next_fileset; } + *pIgnoreEarlierTs = false; tBlockDataReset(state->pBlockData); TABLEID tid = {.suid = state->suid, .uid = state->uid}; code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); @@ -932,16 +947,23 @@ typedef struct SMemNextRowIter { SMEMNEXTROWSTATES state; STbData *pMem; // [input] STbDataIter iter; // mem buffer skip list iterator + int64_t lastTs; // bool iterOpened; // TSDBROW *curRow; } SMemNextRowIter; -static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) { +static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { SMemNextRowIter *state = (SMemNextRowIter *)iter; int32_t code = 0; + *pIgnoreEarlierTs = false; switch (state->state) { case SMEMNEXTROW_ENTER: { if (state->pMem != NULL) { + if (state->pMem->maxKey <= state->lastTs) { + *ppRow = NULL; + *pIgnoreEarlierTs = true; + return code; + } tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); @@ -1042,13 +1064,14 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { return deleted; } -typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow); +typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs); typedef int32_t (*_next_row_clear_fn_t)(void *iter); typedef struct { TSDBROW *pRow; bool stop; bool next; + bool ignoreEarlierTs; void *iter; _next_row_fn_t nextRowFn; _next_row_clear_fn_t nextRowClearFn; @@ -1132,6 +1155,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsLastState.uid = uid; pIter->fsLastState.pLoadInfo = pLoadInfo; pIter->fsLastState.pDataFReader = pDataFReaderLast; + pIter->fsLastState.lastTs = lastTs; pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.pTsdb = pTsdb; @@ -1144,16 +1168,17 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsState.pDataFReader = pDataFReader; pIter->fsState.lastTs = lastTs; - pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; - pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; - pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow, false, true, &pIter->fsLastState, getNextRowFromFSLast, - clearNextRowFromFSLast}; + pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; + pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; + pIter->input[2] = (TsdbNextRowState){ + &pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast}; pIter->input[3] = - (TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; + (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; if (pMem) { pIter->memState.pMem = pMem; pIter->memState.state = SMEMNEXTROW_ENTER; + pIter->memState.lastTs = lastTs; pIter->input[0].stop = false; pIter->input[0].next = true; } @@ -1161,6 +1186,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs if (pIMem) { pIter->imemState.pMem = pIMem; pIter->imemState.state = SMEMNEXTROW_ENTER; + pIter->imemState.lastTs = lastTs; pIter->input[1].stop = false; pIter->input[1].next = true; } @@ -1188,12 +1214,12 @@ _err: } // iterate next row non deleted backward ts, version (from high to low) -static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { +static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { int code = 0; for (;;) { for (int i = 0; i < 4; ++i) { if (pIter->input[i].next && !pIter->input[i].stop) { - code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow); + code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs); if (code) goto _err; if (pIter->input[i].pRow == NULL) { @@ -1205,6 +1231,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) { *ppRow = NULL; + *pIgnoreEarlierTs = (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs || + pIter->input[2].ignoreEarlierTs || pIter->input[3].ignoreEarlierTs); return code; } @@ -1305,6 +1333,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo int16_t noneCol = 0; bool setNoneCol = false; bool hasRow = false; + bool ignoreEarlierTs = false; SArray *pColArray = NULL; SColVal *pColVal = &(SColVal){0}; @@ -1321,7 +1350,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo do { TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow); + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs); if (!pRow) { break; @@ -1421,7 +1450,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo // taosArrayDestroy(pColArray); //} else { if (!hasRow) { - taosArrayClear(pColArray); + if (ignoreEarlierTs) { + taosArrayDestroy(pColArray); + pColArray = NULL; + } else { + taosArrayClear(pColArray); + } } *ppColArray = pColArray; //} @@ -1443,6 +1477,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach int16_t noneCol = 0; bool setNoneCol = false; bool hasRow = false; + bool ignoreEarlierTs = false; SArray *pColArray = NULL; SColVal *pColVal = &(SColVal){0}; @@ -1459,7 +1494,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach do { TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow); + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs); if (!pRow) { break; @@ -1559,7 +1594,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach // taosArrayDestroy(pColArray); //} else { if (!hasRow) { - taosArrayClear(pColArray); + if (ignoreEarlierTs) { + taosArrayDestroy(pColArray); + pColArray = NULL; + } else { + taosArrayClear(pColArray); + } } *ppLastArray = pColArray; //} @@ -1593,8 +1633,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader * SArray *pArray = NULL; bool dup = false; // which is always false for now code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr); - // if table's empty or error, set handle NULL and return - if (code < 0 /* || pArray == NULL*/) { + // if table's empty or error or ignore ignore earlier ts, set handle NULL and return + if (code < 0 || pArray == NULL) { if (!dup && pArray) { taosArrayDestroy(pArray); } @@ -1637,8 +1677,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, if (!h) { SArray *pLastArray = NULL; code = mergeLast(uid, pTsdb, &pLastArray, pr); - // if table's empty or error, set handle NULL and return - if (code < 0 /* || pLastArray == NULL*/) { + // if table's empty or error or ignore ignore earlier ts, set handle NULL and return + if (code < 0 || pLastArray == NULL) { taosThreadMutexUnlock(&pTsdb->lruMutex); *handle = NULL;