diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 85424fd7de24021b4506966413d87f9abde6f159..43ee54f65d8ab29cea92c453624b0b64fa449350 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -366,6 +366,7 @@ typedef struct STagScanInfo { int32_t curPos; SReadHandle readHandle; STableListInfo* pTableListInfo; + SLimitNode* pSlimit; } STagScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2389c7252ef56265f0e695a97fd025b4a4a09675..b7315db204c747659be7f038524b4b719bbe6bb0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2512,6 +2512,53 @@ _error: return NULL; } +static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTaskInfo, STagScanInfo* pInfo, + const SExprInfo* pExprInfo, const SSDataBlock* pRes, int32_t size, const char* str, + int32_t* count, SMetaReader* mr) { + STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); + int32_t code = metaGetTableEntryByUid(mr, item->uid); + tDecoderClear(&(*mr).coder); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); + metaReaderClear(mr); + T_LONG_JMP(pTaskInfo->env, terrno); + } + + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); + + // refactor later + if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { + STR_TO_VARSTR(str, (*mr).me.name); + colDataSetVal(pDst, (*count), str, false); + } else { // it is a tag value + STagVal val = {0}; + val.cid = pExprInfo[j].base.pParam[0].pCol->colId; + const char* p = metaGetTableTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val); + + char* data = NULL; + if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { + data = tTagValToData((const STagVal*)p, false); + } else { + data = (char*)p; + } + colDataSetVal(pDst, (*count), data, + (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + + if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && + data != NULL) { + taosMemoryFree(data); + } + } + } + + (*count) += 1; + if (++pInfo->curPos >= size) { + setOperatorCompleted(pOperator); + } +} + static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2536,47 +2583,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); - int32_t code = metaGetTableEntryByUid(&mr, item->uid); - tDecoderClear(&mr.coder); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), - GET_TASKID(pTaskInfo)); - metaReaderClear(&mr); - T_LONG_JMP(pTaskInfo->env, terrno); - } - - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); - - // refactor later - if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { - STR_TO_VARSTR(str, mr.me.name); - colDataSetVal(pDst, count, str, false); - } else { // it is a tag value - STagVal val = {0}; - val.cid = pExprInfo[j].base.pParam[0].pCol->colId; - const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val); - - char* data = NULL; - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { - data = tTagValToData((const STagVal*)p, false); - } else { - data = (char*)p; - } - colDataSetVal(pDst, count, data, - (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); - - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && - data != NULL) { - taosMemoryFree(data); - } - } - } - - count += 1; - if (++pInfo->curPos >= size) { - setOperatorCompleted(pOperator); + doTagScanOneTable(pOperator, pTaskInfo, pInfo, pExprInfo, pRes, size, str, &count, &mr); + if (pInfo->pSlimit != NULL) { + pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name)); + break; } } @@ -2628,6 +2638,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; + pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 07ea110d7e68382a4872e4560a21eca90ac10332..4ece9be30415913a29e97a99866ebdc98697e9d5 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2418,6 +2418,34 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { return true; } +static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) { + SLogicNode* pNode = pTableScanNode->pParent; + while (NULL != pNode) { + if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) { + return NULL; + } + if (NULL != pNode->pSlimit) { + return pNode; + } + pNode = pNode->pParent; + } + return NULL; +} + +static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) { + if (NULL != pTableScanNode->pSlimit) { + return; + } + + SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode); + if (NULL != pNode) { + //TODO: only set the slimit now. push down slimit later + pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit); + } + return; +} + static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized); if (NULL == pScanNode) { @@ -2458,6 +2486,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp NODES_CLEAR_LIST(pAgg->pChildren); } nodesDestroyNode((SNode*)pAgg); + tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode); pCxt->optimized = true; return TSDB_CODE_SUCCESS; }