From c7559a81cae2ac229337611f0d86b95148897382 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Jul 2022 23:28:27 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 354 +++++++++++++++--------- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/scanoperator.c | 16 +- tests/script/tsim/tmq/basic1.sim | 1 + 5 files changed, 230 insertions(+), 145 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 1d6784982c..de2905fe96 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -130,7 +130,7 @@ bool tsdbNextDataBlock(STsdbReader *pReader); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); +int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2460c5090f..d83f482ca8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,17 +16,22 @@ #include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +typedef struct { + STbDataIter *iter; + int32_t index; + bool hasVal; +} SIterInfo; + typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; SBlockIdx blockIdx; SArray* pBlockList; // block data index list - bool iterInit; // whether to initialize the in-memory skip list iterator or not - STbDataIter* iter; // mem buffer skip list iterator - STbDataIter* iiter; // imem buffer skip list iterator + SIterInfo iter; // mem buffer skip list iterator + SIterInfo iiter; // imem buffer skip list iterator SArray* delSkyline; // delete info for this table - bool memHasVal; - bool imemHasVal; + int32_t fileDelIndex; + bool iterInit; // whether to initialize the in-memory skip list iterator or not } STableBlockScanInfo; typedef struct SBlockOrderWrapper { @@ -112,14 +117,14 @@ struct STsdbReader { char* idStr; // query info handle, for debug purpose int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; - SArray* prev; // previous row which is before than time window - SArray* next; // next row which is after the query time window + SIOCostSummary cost; STSchema* pSchema; - - SDataFReader* pFileReader; - SVersionRange verRange; + SDataFReader* pFileReader; + SVersionRange verRange; #if 0 + SArray* prev; // previous row which is before than time window + SArray* next; // next row which is after the query time window SFileBlockInfo* pDataBlockInfo; SDataCols* pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size @@ -136,19 +141,19 @@ struct STsdbReader { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); -static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); +static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doMergeRowsInBuf(STbDataIter* pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, - STsdbReader* pReader); +static int32_t doMergeRowsInBuf(SIterInfo *pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); +static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey); -static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow** pTSRow, - STsdbReader* pReader); +static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader); static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); +static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -372,7 +377,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->suid = pCond->suid; pReader->order = pCond->order; pReader->capacity = 4096; - pReader->idStr = strdup(idstr); + pReader->idStr = (idstr != NULL)? strdup(idstr):NULL; pReader->verRange = (SVersionRange) {.minVer = pCond->startVersion, .maxVer = 10000}; pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows); @@ -1708,7 +1713,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc } static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { - if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) { + if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) { return TSDB_CODE_SUCCESS; } @@ -1728,13 +1733,14 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* } static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key) { + STSRow* pTSRow, SIterInfo* pIter, int64_t key) { SRowMerger merge = {0}; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBKEY k = TSDBROW_KEY(pRow); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + SArray* pDelList = pBlockScanInfo->delSkyline; // ascending order traverse if (ASCENDING_TRAVERSE(pReader->order)) { @@ -1744,19 +1750,19 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); } else if (k.ts < key) { // k.ts < key - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader); } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMerge(&merge, pRow); - doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMergerGetRow(&merge, &pTSRow); } } else { // descending order scan if (key < k.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader); } else if (k.ts < key) { tRowMergerInit(&merge, &fRow, pReader->pSchema); @@ -1766,7 +1772,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); @@ -1786,9 +1792,10 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; + SArray* pDelList = pBlockScanInfo->delSkyline; - TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); - TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader); + TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader); ASSERT(pRow != NULL && piRow != NULL); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; @@ -1808,12 +1815,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ik.ts == key) { tRowMerge(&merge, piRow); - doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader); } if (k.ts == key) { tRowMerge(&merge, pRow); - doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, &pTSRow); @@ -1825,7 +1832,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [3] ik.ts < key <= k.ts // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { - doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader); + doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1833,7 +1840,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [5] k.ts < key <= ik.ts // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { - doMergeMultiRows(pRow, uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow, pReader); + doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1853,11 +1860,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader); if (ik.ts == k.ts) { tRowMerge(&merge, piRow); - doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader); } if (k.ts == key) { @@ -1875,7 +1882,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [3] ik.ts > k.ts >= Key // [4] ik.ts > key >= k.ts if (ik.ts > key) { - doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader); + doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1894,7 +1901,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* //[7] key = ik.ts > k.ts if (key == ik.ts) { - doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader); + doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); tRowMerge(&merge, &fRow); @@ -1909,7 +1916,8 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(0); } -static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STsdbReader* pReader) { +static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STableBlockScanInfo* pBlockScanInfo, + STsdbReader* pReader) { // check for version and time range int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex]; if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) { @@ -1921,6 +1929,11 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum return false; } + TSDBKEY k = {.ts = ts, .version = ver}; + if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k)) { + return false; + } + return true; } @@ -1934,22 +1947,20 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI STSRow* pTSRow = NULL; int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); - TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); - if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) { + if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { return doMergeThreeLevelRows(pReader, pBlockScanInfo); } else { // imem + file - if (pBlockScanInfo->imemHasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter, - &pBlockScanInfo->imemHasVal, key); + if (pBlockScanInfo->iiter.hasVal) { + return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key); } // mem + file - if (pBlockScanInfo->memHasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter, - &pBlockScanInfo->memHasVal, key); + if (pBlockScanInfo->iter.hasVal) { + return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter,key); } // imem & mem are all empty, only file exist @@ -1973,7 +1984,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* while (1) { // todo check the validate of row in file block { - if (!isValidFileBlockRow(pBlockData, pDumpInfo, pReader)) { + if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { pDumpInfo->rowIndex += step; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); @@ -2018,7 +2029,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; } -static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { +static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { if (pBlockScanInfo->iterInit) { return TSDB_CODE_SUCCESS; } @@ -2038,9 +2049,9 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pReader->pTsdb->mem != NULL) { tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); if (d != NULL) { - code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter); + code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); if (code == TSDB_CODE_SUCCESS) { - pBlockScanInfo->memHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter) != NULL); + pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL); tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 " %s", @@ -2059,9 +2070,9 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* if (pReader->pTsdb->imem != NULL) { tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); if (di != NULL) { - code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter); + code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); if (code == TSDB_CODE_SUCCESS) { - pBlockScanInfo->imemHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter) != NULL); + pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL); tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 " %s", @@ -2076,18 +2087,22 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr); } + initDelSkylineIterator(pBlockScanInfo, pReader, d, di); + pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; } -static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { +int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData) { if (pBlockScanInfo->delSkyline != NULL) { return TSDB_CODE_SUCCESS; } -#if 0 + int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; + SArray* pDelData = taosArrayInit(4, sizeof(SDelData)); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); if (pDelFile) { SDelFReader* pDelFReader = NULL; @@ -2101,27 +2116,51 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb goto _err; } - SDelFile* pDelFileR = pReader->pTsdb->fs->nState->pDelFile; - if (pDelFileR) { - code = tsdbDelFReaderOpen(&pDelFReader, pDelFileR, pTsdb, NULL); - if (code) { - goto _err; - } + code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL); + if (code) { + goto _err; + } - code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL); - if (code) { - goto _err; - } + SDelIdx idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; + SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ); + + code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL); + if (code != TSDB_CODE_SUCCESS) { + goto _err; } + } - code = tsdbBuildDeleteSkyline(pBlockScanInfo->delSkyline, 0, (int32_t)(nDelData - 1), aSkyline); + SDelData* p = NULL; + if (pMemTbData != NULL) { + p = pMemTbData->pHead; + while (p) { + taosArrayPush(pDelData, p); + p = p->pNext; + } } - _err: + if (piMemTbData != NULL) { + p = piMemTbData->pHead; + while (p) { + taosArrayPush(pDelData, p); + p = p->pNext; + } + } + + if (taosArrayGetSize(pDelData) > 0) { + pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY)); + code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline); + } + + taosArrayDestroy(pDelData); + pBlockScanInfo->iter.index = ASCENDING_TRAVERSE(pReader->order)? 0:taosArrayGetSize(pBlockScanInfo->delSkyline) - 1; + pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index; + pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index; return code; -#endif - return 0; +_err: + taosArrayDestroy(pDelData); + return code; } static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) { @@ -2130,13 +2169,13 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - initMemIterator(pScanInfo, pReader); - TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader); + initMemDataIterator(pScanInfo, pReader); + TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); if (pRow != NULL) { key = TSDBROW_KEY(pRow); } - pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader); + pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader); if (pRow != NULL) { TSDBKEY k = TSDBROW_KEY(pRow); if (key.ts > k.ts) { @@ -2230,7 +2269,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; - initMemIterator(pBlockScanInfo, pReader); + initMemDataIterator(pBlockScanInfo, pReader); int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey); @@ -2369,51 +2408,89 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info); // } -TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { - if (!(*hasVal)) { +bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey) { + ASSERT(pKey != NULL); + if (pDelList == NULL) { + return false; + } + + if (*index >= taosArrayGetSize(pDelList) - 1) { + TSDBKEY* last = taosArrayGetLast(pDelList); + if (pKey->ts > last->ts) { + return false; + } else if (pKey->ts == last->ts) { + size_t size = taosArrayGetSize(pDelList); + TSDBKEY* prev = taosArrayGet(pDelList, size - 2); + if (prev->version >= pKey->version) { + return true; + } else { + return false; + } + } else { + ASSERT(0); + } + } else { + TSDBKEY* pCurrent = taosArrayGet(pDelList, *index); + TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1); + + if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) { + return true; + } else { + while (pNext->ts < pKey->ts && (*index) < taosArrayGetSize(pDelList) - 1) { + (*index) += 1; + } + + return false; + } + } +} + +TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) { + if (!pIter->hasVal) { return NULL; } - TSDBROW* pRow = tsdbTbDataIterGet(pIter); + TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); TSDBKEY key = TSDBROW_KEY(pRow); if (outOfTimeWindow(key.ts, &pReader->window)) { - *hasVal = false; + pIter->hasVal = false; return NULL; } - if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) { + // it is a valid data version + if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) && (!hasBeenDropped(pDelList, &pIter->index, &key))) { return pRow; } while (1) { - *hasVal = tsdbTbDataIterNext(pIter); - if (!(*hasVal)) { + pIter->hasVal = tsdbTbDataIterNext(pIter->iter); + if (!pIter->hasVal) { return NULL; } - pRow = tsdbTbDataIterGet(pIter); + pRow = tsdbTbDataIterGet(pIter->iter); key = TSDBROW_KEY(pRow); if (outOfTimeWindow(key.ts, &pReader->window)) { - *hasVal = false; + pIter->hasVal = false; return NULL; } - if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) { + if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer && (!hasBeenDropped(pDelList, &pIter->index, &key))) { return pRow; } } } -int32_t doMergeRowsInBuf(STbDataIter* pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { +int32_t doMergeRowsInBuf(SIterInfo *pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) { while (1) { - *hasVal = tsdbTbDataIterNext(pIter); - if (!(*hasVal)) { + pIter->hasVal = tsdbTbDataIterNext(pIter->iter); + if (!pIter->hasVal) { break; } // data exists but not valid - TSDBROW* pRow = getValidRow(pIter, hasVal, pReader); + TSDBROW* pRow = getValidRow(pIter, pDelList, pReader); if (pRow == NULL) { break; } @@ -2539,15 +2616,14 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { } } -void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow** pTSRow, - STsdbReader* pReader) { +void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader) { SRowMerger merge = {0}; TSDBKEY k = TSDBROW_KEY(pRow); updateSchema(pRow, uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(dIter, hasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); } @@ -2562,18 +2638,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo updateSchema(piRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, piRow, pReader->pSchema); - doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, pRow); - doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } else { updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, piRow); - doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, pTSRow); @@ -2581,33 +2657,34 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey) { - TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); - TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); + TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + SArray* pDelList = pBlockScanInfo->delSkyline; // todo refactor bool asc = ASCENDING_TRAVERSE(pReader->order); - if (pBlockScanInfo->memHasVal) { + if (pBlockScanInfo->iter.hasVal) { TSDBKEY k = TSDBROW_KEY(pRow); if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { pRow = NULL; } } - if (pBlockScanInfo->imemHasVal) { + if (pBlockScanInfo->iiter.hasVal) { TSDBKEY k = TSDBROW_KEY(piRow); if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { piRow = NULL; } } - if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal && pRow != NULL && piRow != NULL) { + if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) { TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); if (ik.ts < k.ts) { // ik.ts < k.ts - doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); + doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader); } else if (k.ts < ik.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader); } else { // ik.ts == k.ts doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); } @@ -2615,13 +2692,13 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } - if (pBlockScanInfo->memHasVal && pRow != NULL) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader); + if (pBlockScanInfo->iter.hasVal && pRow != NULL) { + doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader); return TSDB_CODE_SUCCESS; } - if (pBlockScanInfo->imemHasVal && piRow != NULL) { - doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader); + if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { + doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader); return TSDB_CODE_SUCCESS; } @@ -2686,7 +2763,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e doAppendOneRow(pBlock, pReader, pTSRow); // no data in buffer, return immediately - if (!(pBlockScanInfo->memHasVal || pBlockScanInfo->imemHasVal)) { + if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { break; } @@ -2699,12 +2776,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return TSDB_CODE_SUCCESS; } +// todo refactor, use arraylist instead int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { - // if (pReader->pTableCheckInfo) taosArrayDestroy(pReader->pTableCheckInfo); - // pReader->pTableCheckInfo = createCheckInfoFromUid(pReader, uid); - // if (pReader->pTableCheckInfo == NULL) { - // return TSDB_CODE_TDB_OUT_OF_MEMORY; - // } + ASSERT(pReader != NULL); + taosHashClear(pReader->status.pTableMap); + + STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; + taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); return TDB_CODE_SUCCESS; } @@ -3165,40 +3243,46 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { return pReader->pResBlock->pDataBlock; } -void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { - // if (isEmptyQueryTimeWindow(pReader)) { - // if (pCond->order != pReader->order) { - // pReader->order = pCond->order; - // TSWAP(pReader->window.skey, pReader->window.ekey); - // } +int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { + if (isEmptyQueryTimeWindow(&pReader->window)) { + return TSDB_CODE_SUCCESS; + } - // return; - // } + setQueryTimewindow(pReader, pCond, tWinIdx); - // pReader->order = pCond->order; - // setQueryTimewindow(pReader, pCond, tWinIdx); - // pReader->type = TSDB_QUERY_TYPE_ALL; - // pReader->cur.fid = -1; - // pReader->cur.win = TSWINDOW_INITIALIZER; - // pReader->checkFiles = true; - // pReader->activeIndex = 0; // current active table index - // pReader->locateStart = false; - // pReader->loadExternalRow = pCond->loadExternalRows; - - // if (ASCENDING_TRAVERSE(pCond->order)) { - // assert(pReader->window.skey <= pReader->window.ekey); - // } else { - // assert(pReader->window.skey >= pReader->window.ekey); - // } + pReader->order = pCond->order; + pReader->type = BLOCK_LOAD_OFFSET_ORDER; + pReader->status.loadFromFile = true; + pReader->status.pTableIter = NULL; - // // allocate buffer in order to load data blocks from file - // memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); - // memset(pReader->suppInfo.plist, 0, POINTER_BYTES); + pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows[tWinIdx]); - // tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo); - // tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo); + // allocate buffer in order to load data blocks from file + memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); + memset(pReader->suppInfo.plist, 0, POINTER_BYTES); + + // todo set the correct numOfTables + int32_t numOfTables = 1; + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + + STsdbFSState* pFState = pReader->pTsdb->fs->cState; + initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + + int32_t code = 0; + // no data in files, let's try buffer in memory + if (pReader->status.fileIter.numOfFiles == 0) { + pReader->status.loadFromFile = false; + } else { + code = initForFirstBlockInFile(pReader, pBlockIter); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } - // resetCheckInfo(pReader); + ASSERT(0); + tsdbDebug("%p reset tsdbreader in query %s", pReader, numOfTables, pReader->idStr); + return code; } int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9742ec720f..8ea03d647a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2866,7 +2866,7 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { tsdbSetTableId(pInfo->dataReader, uid); int64_t oldSkey = pInfo->cond.twindows[0].skey; pInfo->cond.twindows[0].skey = ts + 1; - tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0); pInfo->cond.twindows[0].skey = oldSkey; pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a1ea712bbf..916c8939c7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -440,7 +440,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { } pTableScanInfo->curTWinIdx += 1; if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) { - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx); + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx); } } @@ -455,7 +455,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } // do prepare for the next round table scan operation - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); pTableScanInfo->curTWinIdx = 0; } } @@ -464,7 +464,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < total) { if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, 0); - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); pTableScanInfo->curTWinIdx = 0; } @@ -482,7 +482,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { } pTableScanInfo->curTWinIdx += 1; if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) { - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx); + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx); } } @@ -498,7 +498,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); pTableScanInfo->curTWinIdx = 0; } } @@ -526,7 +526,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); - tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0); pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; } @@ -560,7 +560,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); // tsdbSetTableList(pInfo->dataReader, tableList); - tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0); pInfo->curTWinIdx = 0; pInfo->scanTimes = 0; @@ -859,7 +859,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->curTWinIdx = 0; - // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); + // tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); // if (!pTableScanInfo->dataReader) { // return false; // } diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index ee9e87cf04..6880f290f5 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -131,6 +131,7 @@ if $data[0][1] != $consumerId then return -1 endi if $data[0][2] != $expectmsgcnt then + print expect $expectmsgcnt , actual $data02 return -1 endi if $data[0][3] != $expectmsgcnt then -- GitLab