diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d9288753f312f5d303ad68a29371241683dda6a2..f97de5eecf411b9b75d3a8b1cddeb281d1cbfdcc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,6 +64,7 @@ typedef struct SDelFWriter SDelFWriter; typedef struct SDelFReader SDelFReader; typedef struct SRowIter SRowIter; typedef struct STsdbFS STsdbFS; +typedef struct SRowMerger SRowMerger; #define TSDB_MAX_SUBBLOCKS 8 @@ -86,6 +87,11 @@ int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); // SRowIter void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); SColVal *tRowIterNext(SRowIter *pIter); +// SRowMerger +int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); +void tRowMergerClear(SRowMerger *pMerger); +int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow); +int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow); // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); // TSDBKEY @@ -429,6 +435,7 @@ struct SDelIdx { }; struct SDelFile { + int64_t commitID; TSKEY minKey; TSKEY maxKey; int64_t minVersion; @@ -462,25 +469,21 @@ struct SHeadFile { int64_t commitID; int64_t size; int64_t offset; - int32_t nRef; }; struct SDataFile { int64_t commitID; int64_t size; - int32_t nRef; }; struct SLastFile { int64_t commitID; int64_t size; - int32_t nRef; }; struct SSmaFile { int64_t commitID; int64_t size; - int32_t nRef; }; struct SDFileSet { @@ -490,7 +493,10 @@ struct SDFileSet { SDataFile *pDataFile; SLastFile *pLastFile; SSmaFile *pSmaFile; - int32_t nRef; + // SHeadFile headFile; + // SDataFile dataFile; + // SLastFile lastFile; + // SSmaFile smaFile; }; struct SRowIter { @@ -499,6 +505,11 @@ struct SRowIter { SColVal colVal; int32_t i; }; +struct SRowMerger { + STSchema *pTSchema; + int64_t version; + SArray *pArray; // SArray +}; #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7814d5905b4014723b8623c36eaa373740e8ac70..445a4c5ae924bb2010f849c8dd0360f5d34a3ac9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -896,7 +896,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { // start ==================== code = tsdbCommitDataStart(pCommitter); - if (code) return code; + if (code) goto _err; // impl ==================== pCommitter->nextKey = pMemTable->minKey; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index acff2dfa5433ec4dede7f41ccc74dc305807712f..a9c91119ae6e68b27c777685c56d4beb27ef223e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -586,6 +586,83 @@ SColVal *tRowIterNext(SRowIter *pIter) { return NULL; } +// SRowMerger ====================================================== +int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + TSDBKEY key = tsdbRowKey(pRow); + SColVal *pColVal = &(SColVal){0}; + STColumn *pTColumn; + + pMerger->pTSchema = pTSchema; + pMerger->version = key.version; + + pMerger->pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); + if (pMerger->pArray == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // ts + pTColumn = &pTSchema->columns[0]; + + ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // other + for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { + tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + +_exit: + return code; +} + +void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } + +int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { + int32_t code = 0; + TSDBKEY key = tsdbRowKey(pRow); + SColVal *pColVal = &(SColVal){0}; + + ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); + + for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { + tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal); + if (pColVal->isNone) continue; + + // SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); + + if (key.version > pMerger->version) { + // forward merge (todo) + } else if (key.version < pMerger->version) { + // backward merge (todo) + } else { + ASSERT(0); + } + } + + pMerger->version = key.version; + +_exit: + return code; +} + +int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) { + int32_t code = 0; + // TODO + ASSERT(0); + return code; +} + // delete skyline ====================================================== static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { int32_t code = 0;