diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 3489863f370c4226682bb62289f6cec4cb6c9e86..eb1fbac2c9e150bb8f612cc224e2ad96da2436a9 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -52,6 +52,7 @@ target_sources( "src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbCompress.c" "src/tsdb/tsdbCompact.c" + "src/tsdb/tsdbMergeTree.c" # tq "src/tq/tq.c" diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f3a2cd30794d4a2490ed81e2deb758c6142965bc..968cc7f297569e436a20815d3a075ab907ff4a14 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -620,6 +620,12 @@ struct SDataFReader { uint8_t *aBuf[3]; }; +typedef struct { + int64_t suid; + int64_t uid; + TSDBROW row; +} SRowInfo; + // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0824faf390756f86a927b3d4912ae56de6394e3e..b227e97df6918f12ffa72e4393689bf007517fef 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -20,12 +20,6 @@ typedef struct { STSchema *pTSchema; } SSkmInfo; -typedef struct { - int64_t suid; - int64_t uid; - TSDBROW row; -} SRowInfo; - typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c new file mode 100644 index 0000000000000000000000000000000000000000..1d51025331e6418cf55787baa8e3686df0c311ab --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +// SLDataIter ================================================= +typedef struct { + SRBTreeNode node; + SBlockL *pBlockL; + SRowInfo *pRowInfo; + + SDataFReader *pReader; + int32_t iLast; + int8_t backward; + SArray *aBlockL; + int32_t iBlockL; + SBlockData bData; + int32_t iRow; + SRowInfo rInfo; +} SLDataIter; + +int32_t tLDataIterOpen(SLDataIter *pIter, SDataFReader *pReader, int32_t iLast, int8_t backward) { + int32_t code = 0; + + pIter->pReader = pReader; + pIter->iLast = iLast; + pIter->backward = backward; + + pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pIter->aBlockL == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tBlockDataCreate(&pIter->bData); + if (code) goto _exit; + + code = tsdbReadBlockL(pReader, iLast, pIter->aBlockL); + if (code) goto _exit; + + if (backward) { + pIter->iBlockL = taosArrayGetSize(pIter->aBlockL) - 1; + } else { + pIter->iBlockL = 0; + } + +_exit: + return code; +} + +void tLDataIterClose(SLDataIter *pIter) { + tBlockDataDestroy(&pIter->bData, 1); + taosArrayDestroy(pIter->aBlockL); +} + +extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); + +void tLDataIterNextBlock(SLDataIter *pIter) { + if (pIter->backward) { + pIter->iBlockL--; + } else { + pIter->iBlockL++; + } + + if (pIter->iBlockL >= 0 && pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { + pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); + } else { + pIter->pBlockL = NULL; + } +} + +int32_t tLDataIterNextRow(SLDataIter *pIter) { + int32_t code = 0; + + if (pIter->backward) { + pIter->iRow--; + if (pIter->iRow < 0) { + pIter->iBlockL--; + + if (pIter->iBlockL >= 0) { + code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, taosArrayGet(pIter->aBlockL, pIter->iBlockL), + &pIter->bData); + if (code) goto _exit; + } else { + // TODO: no more data here + } + } + } else { + pIter->iRow++; + if (pIter->iRow >= pIter->bData.nRow) { + pIter->iBlockL++; + if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { + code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, taosArrayGet(pIter->aBlockL, pIter->iBlockL), + &pIter->bData); + if (code) goto _exit; + } else { + // TODO: not more data + goto _exit; + } + } + } + + pIter->rInfo.suid = pIter->bData.suid; + pIter->rInfo.uid = pIter->bData.uid; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + +_exit: + return code; +} + +SRowInfo *tLDataIterGet(SLDataIter *pIter) { + // TODO + return NULL; +} + +// SMergeTree ================================================= +typedef struct { + int8_t backward; + SRBTreeNode *pNode; + SRBTree rbt; +} SMergeTree;