From 34e4980a5131446a6a67a2e14d4933d983bd2740 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Jul 2022 15:19:02 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 436 +++++++++++++------------ source/libs/executor/src/executil.c | 24 +- 2 files changed, 249 insertions(+), 211 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ce9bfc54d8..a075970152 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -215,8 +215,6 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { } static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) { - ASSERT(numOfTables >= 1); - // allocate buffer in order to load data blocks from file // todo use simple hash instead SHashObj* pTableMap = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -265,23 +263,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK // } // } -// // only one table, not need to sort again -// static SArray* createCheckInfoFromCheckInfo(STableBlockScanInfo* pCheckInfo, TSKEY skey, SArray** psTable) { -// SArray* pNew = taosArrayInit(1, sizeof(STableBlockScanInfo)); - -// STableBlockScanInfo info = {.lastKey = skey}; - -// info.tableId = pCheckInfo->tableId; -// taosArrayPush(pNew, &info); -// return pNew; -// } - -// todo static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) { ASSERT(pWindow != NULL); - bool asc = ASCENDING_TRAVERSE(order); - return false; -// return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey)); + return pWindow->skey > pWindow->ekey; } // // Update the query time window according to the data time to live(TTL) information, in order to avoid to return @@ -347,8 +331,8 @@ static int32_t initFileIterator(SFilesetIter* pIter, const STsdbFSState* pFState } static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { - bool asc = ASCENDING_TRAVERSE(pIter->order); - int32_t step = asc? 1:-1; + bool asc = ASCENDING_TRAVERSE(pIter->order); + int32_t step = asc ? 1 : -1; pIter->index += step; if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) { @@ -357,26 +341,36 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { // check file the time range of coverage STimeWindow win = {0}; - pReader->status.pCurrentFileset = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); - int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + while(1) { + pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index); - // todo file range check -// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); + int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } - // current file are not overlapped with query time window, ignore remain files -// if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.ekey)) { -// tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, -// pReader->window.skey, pReader->window.ekey, pReader->idStr); -// return false; -// } + int32_t fid = pReader->status.pCurrentFileset->fid; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + // current file are no longer overlapped with query time window, ignore remain files + if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) { + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, + pReader->window.skey, pReader->window.ekey, pReader->idStr); + return false; + } + + if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) { + pIter->index += step; + continue; + } - return true; + tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, fid, + pReader->window.skey, pReader->window.ekey, pReader->idStr); + return true; + } - _err: +_err: return false; } @@ -394,6 +388,29 @@ static void initReaderStatus(SReaderStatus* pStatus) { pStatus->loadFromFile = true; } +static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) { + SSDataBlock* pResBlock = createDataBlock(); + if (pResBlock == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < pCond->numOfCols; ++i) { + SColumnInfoData colInfo = {{0}, 0}; + colInfo.info = pCond->colList[i]; + blockDataAppendColInfo(pResBlock, &colInfo); + } + + int32_t code = blockDataEnsureCapacity(pResBlock, capacity); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + taosMemoryFree(pResBlock); + return NULL; + } + + return pResBlock; +} + static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) { int32_t code = 0; STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader)); @@ -427,30 +444,27 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd // todo remove this setQueryTimewindow(pReader, pCond, 0); + ASSERT (pCond->numOfCols > 0); - if (pCond->numOfCols > 0) { - limitOutputBufferSize(pCond, &pReader->capacity); - - // allocate buffer in order to load data blocks from file - pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); - if (pReader->suppInfo.pstatis == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - pReader->pResBlock = createDataBlock(); - for (int32_t i = 0; i < pCond->numOfCols; ++i) { - SColumnInfoData colInfo = {{0}, 0}; - colInfo.info = pCond->colList[i]; - blockDataAppendColInfo(pReader->pResBlock, &colInfo); - } + limitOutputBufferSize(pCond, &pReader->capacity); - blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity); + // allocate buffer in order to load data blocks from file + SBlockLoadSuppInfo* pSup = &pReader->suppInfo; + pSup->pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); + pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES); + if (pSup->pstatis == NULL || pSup->plist == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } - setColumnIdSlotList(pReader, pReader->pResBlock); - pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES); + pReader->pResBlock = createResBlock(pCond, pReader->capacity); + if (pReader->pResBlock == NULL) { + code = terrno; + goto _end; } + setColumnIdSlotList(pReader, pReader->pResBlock); + STsdbFSState* pFState = pReader->pTsdb->fs->cState; initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); @@ -829,7 +843,6 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int int32_t step = ASCENDING_TRAVERSE(order)? 1:-1; pDumpInfo->allDumped = true; - pDumpInfo->rowIndex = ASCENDING_TRAVERSE(order)? 0:pBlock->nRow-1; pDumpInfo->lastKey = pBlock->maxKey.ts + step; } @@ -847,10 +860,11 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_ } } -// todo consider the output buffer size -static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { - int64_t st = taosGetTimestampUs(); +static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; + SBlockData* pBlockData = &pStatus->fileBlockData; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SSDataBlock* pResBlock = pReader->pResBlock; @@ -859,25 +873,29 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - uint8_t *pb = NULL, *pb1 = NULL; - int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + int64_t st = taosGetTimestampUs(); SColVal cv = {0}; int32_t colIndex = 0; bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; - int32_t rowIndex = 0; + int32_t rowIndex = 0; int32_t remain = asc ? (pBlockData->nRow - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex + 1); + int32_t endIndex = 0; + if (remain <= pReader->capacity) { + endIndex = pBlockData->nRow; + } else { + endIndex = pDumpInfo->rowIndex + step * pReader->capacity; + remain = pReader->capacity; + } + int32_t i = 0; SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) { + for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) { colDataAppend(pColData, rowIndex++, (const char*)&pBlockData->aTSKEY[j], false); } i += 1; @@ -890,7 +908,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex); if (pData->cid == pColData->info.colId) { - for (int32_t j = pDumpInfo->rowIndex; j < pBlockData->nRow && j >= 0; j += step) { + for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) { tColDataGetValue(pData, j, &cv); doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); } @@ -909,15 +927,48 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI i += 1; } - pResBlock->info.rows = pBlockData->nRow; - setBlockAllDumped(&pReader->status.fBlockDumpInfo, pBlock, pReader->order); + pResBlock->info.rows = remain; + pDumpInfo->rowIndex += step*remain; + + setBlockAllDumped(pDumpInfo, pBlock, pReader->order); int64_t elapsedTime = (taosGetTimestampUs() - st); pReader->cost.blockLoadTime += elapsedTime; + int32_t unDumpedRows = asc? pBlock->nRow - pDumpInfo->rowIndex: pDumpInfo->rowIndex + 1; tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s", - pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow, + ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s", + pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, + pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); + + return TSDB_CODE_SUCCESS; +} + +// todo consider the output buffer size +static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { + int64_t st = taosGetTimestampUs(); + + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); + SSDataBlock* pResBlock = pReader->pResBlock; + int32_t numOfCols = blockDataGetNumOfCols(pResBlock); + + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + + uint8_t *pb = NULL, *pb1 = NULL; + int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + int64_t elapsedTime = (taosGetTimestampUs() - st); + pReader->cost.blockLoadTime += elapsedTime; + + pDumpInfo->allDumped = false; + tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s", + pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -2022,29 +2073,6 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { return true; } -//static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) { -// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; -// SQueryFilePos* cur = &pTsdbReadHandle->cur; -// -// while (1) { -// int32_t code = loadFileDataBlock(pTsdbReadHandle, pNext->compBlock, pNext->pTableCheckInfo, exists); -// if (code != TSDB_CODE_SUCCESS || *exists) { -// return code; -// } -// -// if ((cur->slot == pTsdbReadHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || -// (cur->slot == 0 && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { -// // all data blocks in current file has been checked already, try next file if exists -// return getFirstFileDataBlock(pTsdbReadHandle, exists); -// } else { // next block of the same file -// cur->slot += step; -// cur->mixBlock = false; -// cur->blockCompleted = false; -// pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot]; -// } -// } -//} - // static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists) { // pTsdbReadHandle->numOfBlocks = 0; // SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -2283,52 +2311,65 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn return TSDB_CODE_SUCCESS; } } - } else if (pBlockScanInfo->imemHasVal) { - TSDBKEY ik = TSDBROW_KEY(piRow); - if (key <= ik.ts) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); + } else { + if (pBlockScanInfo->imemHasVal) { + TSDBKEY ik = TSDBROW_KEY(piRow); + if (key <= ik.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); - if (ik.ts == mergeTs) { + if (ik.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + if (ik.ts < key) { doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); } - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } - if (ik.ts < key) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - return TSDB_CODE_SUCCESS; - } - } else { // pBlockScanInfo->memHasVal != NULL - TSDBKEY k = TSDBROW_KEY(pRow); - if (key <= k.ts) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - tRowMergerInit(&merge, &fRow, pReader->pSchema); + if (pBlockScanInfo->memHasVal) { // pBlockScanInfo->memHasVal != NULL + TSDBKEY k = TSDBROW_KEY(pRow); + if (key <= k.ts) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); - doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); - if (k.ts == mergeTs) { + if (k.ts == mergeTs) { + doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + return TSDB_CODE_SUCCESS; + } + + if (k.ts < key) { doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); } - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } - if (k.ts < key) { - doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); - tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); - return TSDB_CODE_SUCCESS; - } + // imem & mem are all empty + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendOneRow(pReader->pResBlock, pReader, pTSRow); } return TSDB_CODE_SUCCESS; @@ -2343,8 +2384,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo->tbBlockIdx == pFBlock->tbBlockIdx) { // still in the same file block now - - if (pDumpInfo->rowIndex >= pBlock->nRow) { + if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { + setBlockAllDumped(pDumpInfo, pBlock, pReader->order); break; } @@ -2497,10 +2538,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { - SBlockData data = {0}; - tBlockDataInit(&data); - - code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); + tBlockDataInit(&pStatus->fileBlockData); + code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2736,32 +2775,64 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + bool asc = ASCENDING_TRAVERSE(pReader->order); + int32_t step = asc? 1:-1; int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - if (pDumpInfo->rowIndex < pBlockData->nRow - 1) { - if (pBlockData->aTSKEY[pDumpInfo->rowIndex + 1] == key) { - int32_t rowIndex = pDumpInfo->rowIndex + 1; - while (pBlockData->aTSKEY[rowIndex] == key) { - if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer) { - continue; + if (asc) { // todo refactor + if (pDumpInfo->rowIndex < pBlockData->nRow - 1) { + if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) { + int32_t rowIndex = pDumpInfo->rowIndex + step; + + while (pBlockData->aTSKEY[rowIndex] == key) { + if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) { + continue; + } + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); + tRowMerge(pMerger, &fRow); + rowIndex += step; } - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tRowMerge(pMerger, &fRow); - rowIndex += 1; + pDumpInfo->rowIndex = rowIndex; + } else { + pDumpInfo->rowIndex += step; + } + } else { // last row of current block, check if current block is overlapped with neighbor block + pDumpInfo->rowIndex += step; + bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); + if (overlap) { // load next block + ASSERT(0); } - - pDumpInfo->rowIndex = rowIndex; } - } else { // last row of current block, check if current block is overlapped with neighbor block - pDumpInfo->rowIndex += 1; - bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); - if (overlap) { - // load next block + } else { + if (pDumpInfo->rowIndex > 0) { + if (pBlockData->aTSKEY[pDumpInfo->rowIndex + step] == key) { + int32_t rowIndex = pDumpInfo->rowIndex + step; + + while (pBlockData->aTSKEY[rowIndex] == key) { + if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer || + pBlockData->aVersion[rowIndex] < pReader->verRange.minVer) { + continue; + } + + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); + tRowMerge(pMerger, &fRow); + rowIndex += step; + } + pDumpInfo->rowIndex = rowIndex; + } else { + pDumpInfo->rowIndex += step; + } + } else { // last row of current block, check if current block is overlapped with previous neighbor block + pDumpInfo->rowIndex += step; + bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo); + if (overlap) { // load next block + ASSERT(0); + } } - } return TSDB_CODE_SUCCESS; @@ -2889,23 +2960,11 @@ int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY max do { STSRow* pTSRow = NULL; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow); - doAppendOneRow(pBlock, pReader, pTSRow); - - if (pBlockScanInfo->memHasVal) { - TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iter); - TSDBKEY k = TSDBROW_KEY(pRow); - if (k.ts >= maxKey.ts) { - break; - } + if (pTSRow == NULL) { + break; } - if (pBlockScanInfo->imemHasVal) { - TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iiter); - TSDBKEY k = TSDBROW_KEY(pRow); - if (k.ts >= maxKey.ts) { - break; - } - } + doAppendOneRow(pBlock, pReader, pTSRow); // no data in buffer, return immediately if (!(pBlockScanInfo->memHasVal || pBlockScanInfo->imemHasVal)) { @@ -3127,34 +3186,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } -#if 0 - // int32_t code = setCurrentSchema(pVnode, pReader); - // if (code != TSDB_CODE_SUCCESS) { - // terrno = code; - // return NULL; - // } - - // int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn); - // int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData; - - // STSchema* pSchema = pReader->pSchema; - - // int32_t i = 0, j = 0; - // while (i < numOfCols && j < pSchema->numOfCols) { - // if (ids[i] == pSchema->columns[j].colId) { - // pReader->suppInfo.slotIds[i] = j; - // i++; - // j++; - // } else if (ids[i] > pSchema->columns[j].colId) { - // j++; - // } else { - // // tsdbReaderClose(pTsdbReadHandle); - // terrno = TSDB_CODE_INVALID_PARA; - // return NULL; - // } - // } -#endif - tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -3362,24 +3393,25 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { if (pStatus->composedDataBlock) { return pReader->pResBlock->pDataBlock; - } else { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + } - int32_t code = tBlockDataInit(&pStatus->fileBlockData); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } + int32_t code = tBlockDataInit(&pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } - return pReader->pResBlock->pDataBlock; + code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; } + + copyBlockDataToSDataBlock(pReader, pBlockScanInfo); + return pReader->pResBlock->pDataBlock; } void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 74debd1531..fddcd0c51b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -285,13 +285,16 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; + pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); - if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY; + if (pListInfo->pTableList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } uint64_t tableUid = pScanNode->uid; pListInfo->suid = pScanNode->suid; - + SNode* pTagCond = (SNode*)pListInfo->pTagCond; SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond; if (pScanNode->tableType == TSDB_SUPER_TABLE) { @@ -300,7 +303,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, .metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid}; SArray* res = taosArrayInit(8, sizeof(uint64_t)); - //code = doFilterTag(pTagIndexCond, &metaArg, res); + // code = doFilterTag(pTagIndexCond, &metaArg, res); code = TSDB_CODE_INDEX_REBUILDING; if (code == TSDB_CODE_INDEX_REBUILDING) { code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); @@ -322,24 +325,27 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } - if(pTagCond){ + if (pTagCond) { int32_t i = 0; - while(i < taosArrayGetSize(pListInfo->pTableList)) { + while (i < taosArrayGetSize(pListInfo->pTableList)) { STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i); - bool isOk = isTableOk(info, pTagCond, metaHandle); - if(!isOk){ + bool isOk = isTableOk(info, pTagCond, metaHandle); + if (!isOk) { taosArrayRemove(pListInfo->pTableList, i); continue; } i++; } } - }else { // Create one table group. + } else { // Create one table group. STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0}; taosArrayPush(pListInfo->pTableList, &info); } + pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES); - if(pListInfo->pGroupList == NULL) return TSDB_CODE_OUT_OF_MEMORY; + if(pListInfo->pGroupList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } //put into list as default group, remove it if grouping sorting is required later taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList); -- GitLab