diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 00072a95f12b284e06072d9147e78df97d486068..348ecf17ace18f8579fec6926921e6cf3d6905db 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1919,6 +1919,8 @@ typedef struct SFSNextRowIter { SFSLastIter lastIter; SFSLastIter *pLastIter; TSDBROW *pLastRow; + SRow *pTSRow; + SRowMerger rowMerger; SCacheRowsReader *pr; struct CacheNextRowIter *pRowIter; } SFSNextRowIter; @@ -1936,6 +1938,11 @@ static void clearLastFileSet(SFSNextRowIter *state) { if (state->pFileReader) { tsdbDataFileReaderClose(&state->pFileReader); } + + if (state->pTSRow) { + taosMemoryFree(state->pTSRow); + state->pTSRow = NULL; + } } static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, @@ -2148,6 +2155,35 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie return code; } else { // TODO: merge rows and *ppRow = mergedRow + SRowMerger *pMerger = &state->rowMerger; + tsdbRowMergerInit(pMerger, state->pTSchema); + + code = tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + code = tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + if (state->pTSRow) { + taosMemoryFree(state->pTSRow); + state->pTSRow = NULL; + } + + code = tsdbRowMergerGetRow(pMerger, &state->pTSRow); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow); + *ppRow = &state->row; + --state->iRow; + + tsdbRowMergerClear(pMerger); + + return code; } }