From 4eaecd11deee11a0bb8678a365298bee01351703 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Jul 2023 18:10:36 +0800 Subject: [PATCH] tsdb/cache: merge fs & stt row --- source/dnode/vnode/src/tsdb/tsdbCache.c | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 00072a95f1..348ecf17ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1919,6 +1919,8 @@ typedef struct SFSNextRowIter { SFSLastIter lastIter; SFSLastIter *pLastIter; TSDBROW *pLastRow; + SRow *pTSRow; + SRowMerger rowMerger; SCacheRowsReader *pr; struct CacheNextRowIter *pRowIter; } SFSNextRowIter; @@ -1936,6 +1938,11 @@ static void clearLastFileSet(SFSNextRowIter *state) { if (state->pFileReader) { tsdbDataFileReaderClose(&state->pFileReader); } + + if (state->pTSRow) { + taosMemoryFree(state->pTSRow); + state->pTSRow = NULL; + } } static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, @@ -2148,6 +2155,35 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie return code; } else { // TODO: merge rows and *ppRow = mergedRow + SRowMerger *pMerger = &state->rowMerger; + tsdbRowMergerInit(pMerger, state->pTSchema); + + code = tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + code = tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (state->pTSRow) { + taosMemoryFree(state->pTSRow); + state->pTSRow = NULL; + } + + code = tsdbRowMergerGetRow(pMerger, &state->pTSRow); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow); + *ppRow = &state->row; + --state->iRow; + + tsdbRowMergerClear(pMerger); + + return code; } } -- GitLab