diff --git a/include/libs/function/function.h b/include/libs/function/function.h index d5da306fd297dd49f4753aa01c6423cb9dd82e9c..faf35bf03c82c1ecf7a399a86350719c3bc93ec3 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -139,7 +139,7 @@ typedef struct SqlFunctionCtx { struct SExprInfo *pExpr; struct SDiskbasedBuf *pBuf; struct SSDataBlock *pSrcBlock; - struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity + struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity int32_t curBufPage; bool increase; bool isStream; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ec27ba8ce659e454b768945aca50fb071d4e7b4f..430aadb5581a51259a203874af00230df41ad26d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -128,8 +128,10 @@ typedef struct STsdbReader STsdbReader; #define TIMEWINDOW_RANGE_CONTAINED 1 #define TIMEWINDOW_RANGE_EXTERNAL 2 -#define LASTROW_RETRIEVE_TYPE_ALL 0x1 -#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 +#define CACHESCAN_RETRIEVE_TYPE_ALL 0x1 +#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2 +#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 +#define CACHESCAN_RETRIEVE_LAST 0x8 int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, @@ -146,9 +148,9 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); -int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); -int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); -int32_t tsdbLastrowReaderClose(void *pReader); +int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); +int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); +int32_t tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 66843d9a2844c44e77e798ab47032ef75370a544..ea9a7ec7d9b3df80edbb1e5f93db5b2420f908e5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -18,7 +18,7 @@ #include "tcommon.h" #include "tsdb.h" -typedef struct SLastrowReader { +typedef struct SCacheRowsReader { SVnode* pVnode; STSchema* pSchema; uint64_t uid; @@ -27,9 +27,9 @@ typedef struct SLastrowReader { int32_t type; int32_t tableIndex; // currently returned result tables SArray* pTableList; // table id list -} SLastrowReader; +} SCacheRowsReader; -static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { +static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; @@ -61,8 +61,10 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade pBlock->info.rows += 1; } -int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { - SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); +int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { + *pReader = NULL; + + SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -81,9 +83,17 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->pTableList = pTableIdList; p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); + if (p->transferBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) { p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes); + if (p->transferBuf[i] == NULL) { + tsdbCacherowsReaderClose(p); + return TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -91,8 +101,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, return TSDB_CODE_SUCCESS; } -int32_t tsdbLastrowReaderClose(void* pReader) { - SLastrowReader* p = pReader; +int32_t tsdbCacherowsReaderClose(void* pReader) { + SCacheRowsReader* p = pReader; if (p->pSchema != NULL) { for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { @@ -107,28 +117,56 @@ int32_t tsdbLastrowReaderClose(void* pReader) { return TSDB_CODE_SUCCESS; } -int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { +static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, LRUHandle** h) { + int32_t code = TSDB_CODE_SUCCESS; + if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { + code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (*h != NULL) { + *pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); + } + } else { + code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (*h != NULL) { + SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h); + tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema); + } + } + + return code; +} + +int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } - SLastrowReader* pr = pReader; + SCacheRowsReader* pr = pReader; + int32_t code = TSDB_CODE_SUCCESS; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; LRUHandle* h = NULL; STSRow* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); // retrieve the only one last row of all tables in the uid list. - if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { + if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { int64_t lastKey = INT64_MIN; bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - // int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - if (code != TSDB_CODE_SUCCESS) { + code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); + if (code != TSDB_CODE_SUCCESS) { return code; } @@ -136,9 +174,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t continue; } - pRow = (STSRow*)taosLRUCacheValue(lruCache, h); - // SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h); - // tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema); if (pRow->ts > lastKey) { // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // appended or not. @@ -155,25 +190,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t tsdbCacheRelease(lruCache, h); } - } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { + } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - - int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - // int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); - if (code != TSDB_CODE_SUCCESS) { + code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); + if (code != TSDB_CODE_SUCCESS) { return code; } - // no data in the table of Uid if (h == NULL) { continue; } - pRow = (STSRow*)taosLRUCacheValue(lruCache, h); - // SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h); - // tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema); - saveOneRow(pRow, pResBlock, pr, slotIds); taosArrayPush(pTableUidList, &pKeyInfo->uid); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0a0ef11774ce67f177bd7039a011bb5d7bcce722..9aeec5794833014bf6f7fe1f4ae9204616c4c96c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,9 +16,9 @@ #include "osDef.h" #include "tsdb.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ALL_ROWS_CHECKED_INDEX (INT16_MIN) -#define DEFAULT_ROW_INDEX_VAL (-1) +#define INITIAL_ROW_INDEX_VAL (-1) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -234,7 +234,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; + STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { info.lastKey = pTsdbReader->window.skey; @@ -266,7 +266,9 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) { p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); } - p->delSkyline = taosArrayDestroy(p->delSkyline); + p->fileDelIndex = -1; + p->delSkyline = taosArrayDestroy(p->delSkyline); + p->lastBlockDelIndex = INITIAL_ROW_INDEX_VAL; } } @@ -414,7 +416,7 @@ _err: return false; } -static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) { +static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; @@ -423,7 +425,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashOb } else { taosArrayClear(pIter->blockList); } - pIter->pTableMap = pTableMap; } static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } @@ -579,7 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { } // reset the index in last block when handing a new file - px->indexInBlockL = DEFAULT_ROW_INDEX_VAL; + px->indexInBlockL = INITIAL_ROW_INDEX_VAL; tMapDataClear(&px->mapData); taosArrayClear(px->pBlockList); } @@ -2403,7 +2404,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); int32_t index = pScanInfo->indexInBlockL; - if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { + if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); if (!hasData) { // current table does not have rows in last block, try next table bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); @@ -2470,7 +2471,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // note: the lastblock may be null here initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); - if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { + if (pScanInfo->indexInBlockL == INITIAL_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); } } @@ -2582,7 +2583,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(pBlockIter, pReader->order); } SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader; @@ -2654,7 +2655,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { initBlockDumpInfo(pReader, pBlockIter); } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(pBlockIter, pReader->order); goto _begin; } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3276,7 +3277,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { ASSERT(pReader != NULL); taosHashClear(pReader->status.pTableMap); - STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; + STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL}; taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); return TDB_CODE_SUCCESS; } @@ -3372,7 +3373,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl SDataBlockIter* pBlockIter = &pReader->status.blockIter; initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { @@ -3393,7 +3394,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader); - resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap); + resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order); // no data in files, let's try buffer in memory if (pPrevReader->status.fileIter.numOfFiles == 0) { @@ -3696,7 +3697,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDataFReaderClose(&pReader->pFileReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); + resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockScanInfo(pReader->status.pTableMap); int32_t code = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4e86333cf760268646939c4fa14338942998af7a..05bdc3970191fc6e20e35c473d3a8ad841a24995 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -908,7 +908,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b31fa279e57ad3436d19264071959e91fe4709f7..94d9d0cadbd1cf21ac8303a4bee7b86da9695f3c 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -25,24 +25,27 @@ #include "thash.h" #include "ttypes.h" -static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); +static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } pInfo->readHandle = *readHandle; - pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); + pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID); - int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); + code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -55,13 +58,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead // partition by tbname if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { - pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL; - tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|CACHESCAN_RETRIEVE_LAST_ROW; + code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, + taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); } else { // by tags - pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE; + pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|CACHESCAN_RETRIEVE_LAST_ROW; } if (pScanNode->scan.pScanPseudoCols != NULL) { @@ -80,19 +87,19 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); pOperator->cost.openCost = 0; return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); + pTaskInfo->code = code; + destroyLastrowScanOperator(pInfo); taosMemoryFree(pOperator); return NULL; } -SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { +SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -109,14 +116,14 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); // check if it is a group by tbname - if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) { + if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { blockDataCleanup(pInfo->pBufferredRes); taosArrayClear(pInfo->pUidList); - int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); + int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // check for tag values @@ -172,11 +179,11 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { while (pInfo->currentGroupIndex < totalGroups) { SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); - tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, + tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); - int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -200,7 +207,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { } } - tsdbLastrowReaderClose(pInfo->pLastrowReader); + tsdbCacherowsReaderClose(pInfo->pLastrowReader); return pInfo->pRes; } } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6f4c84f9c0159980c9b1cfed5315a1146ce55f24..dc9217bf65105e36e7b2b4865419a098623af1b9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4138,7 +4138,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); + pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 013c58cc4501c091cc745330b584174064aff404..32d0472a50c596ed4a125a961555124aa408be3f 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3643,7 +3643,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); - int16_t type = pCtx->input.pData[0]->info.type; + int16_t type = pCtx->pExpr->base.resSchema.type; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index ac2128dd70b70b88dd32617067ff0ce8e0d3172a..fbade7b07452344160fd60e427dae0bea689f667 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -293,16 +293,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { } } - // int32_t pos = listNEles(pBuf->lruList); - // SListIter iter1 = {0}; - // tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD); - // SListNode* pn1 = NULL; - // while((pn1 = tdListNext(&iter1)) != NULL) { - // SPageInfo* pageInfo = *(SPageInfo**) pn1->data; - // printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1); - // pos -= 1; - // } - return pn; } @@ -661,10 +651,14 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); - printf( - "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", - ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, - ps->loadBytes / (1024.0 * ps->loadPages)); + if (ps->loadPages > 0) { + printf( + "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", + ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, + ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); + } else { + printf("no page loaded\n"); + } } void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {