From 6669c3a5ad392f242185f7a89b3fb2c2566ed766 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 18 Apr 2023 17:13:14 +0800 Subject: [PATCH] feat: last queries with tags output can be read from cache --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 17 +++++------ source/libs/executor/src/cachescanoperator.c | 32 ++++++++++---------- source/libs/planner/src/planOptimizer.c | 21 ++++++++----- 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 95981c2f08..f537b7d6e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 p->ts = pCol->ts; p->colVal = pCol->colVal; singleTableLastTs = pCol->ts; - - // only set value for last row query - if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { - if (taosArrayGetSize(pTableUidList) == 0) { - taosArrayPush(pTableUidList, &pKeyInfo->uid); - } else { - taosArraySet(pTableUidList, 0, &pKeyInfo->uid); - } - } } } else { SLastCol* p = taosArrayGet(pLastCols, slotId); @@ -417,6 +408,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } + if (0 == i) { + if (taosArrayGetSize(pTableUidList) == 0) { + taosArrayPush(pTableUidList, &pKeyInfo->uid); + } else { + taosArraySet(pTableUidList, 0, &pKeyInfo->uid); + } + } + tsdbCacheRelease(lruCache, h); } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index f6fc332b37..61f024ebb7 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); -#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) +#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; tableListDestroy(pTableListInfo); @@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, + pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); } - setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = @@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->cost.openCost = 0; return pOperator; - _error: +_error: pTaskInfo->code = code; destroyCacheScanOperator(pInfo); taosMemoryFree(pOperator); @@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } SCacheRowsScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = pInfo->pTableList; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableListInfo* pTableList = pInfo->pTableList; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); @@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pRes->info.rows = 1; SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, - pRes->info.rows, GET_TASKID(pTaskInfo), NULL); + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, + pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } STableKeyInfo* pList = NULL; - int32_t num = 0; + int32_t num = 0; int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); if (code != TSDB_CODE_SUCCESS) { @@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->pRes->info.id.groupId = pKeyInfo->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { - ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); - pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); - code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, - GET_TASKID(pTaskInfo), NULL); + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC return TSDB_CODE_SUCCESS; } - size_t size = taosArrayGetSize(pColMatchInfo->pList); + size_t size = taosArrayGetSize(pColMatchInfo->pList); SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); for (int32_t i = 0; i < size; ++i) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 52bb03466c..d3a03b7b75 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group *pNotOptimize = false; return TSDB_CODE_SUCCESS; } - + switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: { SScanLogicNode* pScan = (SScanLogicNode*)pNode; @@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt { bool hasCol; } SLastRowScanOptLastParaCkCxt; -static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { +static EDealRes lastRowScanOptLastParaIsTagImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SLastRowScanOptLastParaCkCxt* pCxt = pContext; if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { @@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static bool lastRowScanOptLastParaCheck(SNode* pExpr) { +static bool lastRowScanOptLastParaIsTag(SNode* pExpr) { SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false}; - nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt); - return !cxt.hasTag && cxt.hasCol; + nodesWalkExpr(pExpr, lastRowScanOptLastParaIsTagImpl, &cxt); + return cxt.hasTag && !cxt.hasCol; } static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) { @@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { SFunctionNode* pAggFunc = (SFunctionNode*)pFunc; if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { - if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) { + if (hasSelectFunc) { return false; } hasLastFunc = true; - } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { + } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) { if (hasLastFunc) { return false; } hasSelectFunc = true; + } else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { + if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) { + return false; + } } else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) { return false; } @@ -2282,6 +2286,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1)); } + } else if (FUNCTION_TYPE_GROUP_KEY == funcType) { + nodesListMakeAppend(&cxt.pLastCols, nodesListGetNode(pFunc->pParameterList, 0)); } } @@ -2291,7 +2297,6 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic if (NULL != cxt.pLastCols) { cxt.doAgg = false; lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols); - NODES_DESTORY_LIST(pScan->pScanPseudoCols); lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols); nodesClearList(cxt.pLastCols); } -- GitLab