diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 89fbc92992bf783b1d8896fbf636f2468b6fa4c6..3d86adb573cd27dfce3b93409b96a11b47b7aaf5 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -146,7 +146,8 @@ bool fmIsBuiltinFunc(const char* pFunc); bool fmIsAggFunc(int32_t funcId); bool fmIsScalarFunc(int32_t funcId); -bool fmIsNonstandardSQLFunc(int32_t funcId); +bool fmIsVectorFunc(int32_t funcId); +bool fmIsIndefiniteRowsFunc(int32_t funcId); bool fmIsStringFunc(int32_t funcId); bool fmIsDatetimeFunc(int32_t funcId); bool fmIsSelectFunc(int32_t funcId); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 7ca4ca917297e5042ae87c9aaba244a881e6f0a3..6c4d14ffa10be13974e4651868fda955e41cebb7 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -54,6 +54,7 @@ typedef struct SScanLogicNode { int64_t sliding; int8_t intervalUnit; int8_t slidingUnit; + SNode* pTagCond; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -343,6 +344,7 @@ typedef struct SSubplan { SNodeList* pParents; // the data destination subplan, get data from current subplan SPhysiNode* pNode; // physical plan of current subplan SDataSinkNode* pDataSink; // data of the subplan flow into the datasink + SNode* pTagCond; } SSubplan; typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 298dffcc839e22226a89932b2571a90ffaa197d0..18ddd11fc5648407b450afeff3deeee427060434 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -236,7 +236,7 @@ typedef struct SSelectStmt { bool isTimeOrderQuery; bool hasAggFuncs; bool hasRepeatScanFuncs; - bool hasNonstdSQLFunc; + bool hasIndefiniteRowsFunc; } SSelectStmt; typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b1b0e69084c7a35ef9bdabacd40515f46b8a7c6f..5abab0dd158bd72e8c82cffe183e73f3592f7abc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -107,7 +107,7 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); -static void releaseQueryBuf(size_t numOfTables); +static void releaseQueryBuf(size_t numOfTables); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); @@ -155,8 +155,9 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); +static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, + SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, + SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); @@ -183,10 +184,10 @@ static int compareRowData(const void* a, const void* b, const void* userData) { int16_t offset = supporter->dataOffset; return 0; -// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); -// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); + // char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); + // char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); -// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; + // return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; } // setup the output buffer for each operator @@ -583,8 +584,9 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, + SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, + int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily bool hasAgg = pCtx[k].input.colDataAggIsSet; @@ -666,8 +668,8 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC } } -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, - bool createDummyCol) { +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, + int32_t scanFlag, bool createDummyCol) { if (pBlock->pBlockAgg != NULL) { doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); } else { @@ -718,7 +720,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, - int32_t scanFlag, bool createDummyCol) { + int32_t scanFlag, bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { @@ -726,7 +728,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].input.numOfRows = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; - pCtx[i].scanFlag = scanFlag; + pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; pInput->uid = pBlock->info.uid; @@ -835,7 +837,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; SScalarParam dest = {.columnData = &idata}; - int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); + int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); return code; @@ -853,7 +855,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc // _rowts/_c0, not tbname column if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { // do nothing - } else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) { + } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]); pfCtx->fpSet.init(&pCtx[k], pResInfo); @@ -951,14 +953,14 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } -// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { -// // return QUERY_IS_ASC_QUERY(pQueryAttr); -// } -// -// // denote the order type -// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { -// // return pCtx->param[0].i == pQueryAttr->order.order; -// } + // if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { + // // return QUERY_IS_ASC_QUERY(pQueryAttr); + // } + // + // // denote the order type + // if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { + // // return pCtx->param[0].i == pQueryAttr->order.order; + // } // in the reverse table scan, only the following functions need to be executed // if (IS_REVERSE_SCAN(pRuntimeEnv) || @@ -1073,19 +1075,19 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu for (int32_t i = 0; i < numOfOutput; ++i) { if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) { pValCtx[num++] = &pCtx[i]; - } else if (fmIsAggFunc(pCtx[i].functionId)) { + } else if (fmIsSelectFunc(pCtx[i].functionId)) { p = &pCtx[i]; } -// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { -// tagLen += pCtx[i].resDataInfo.bytes; -// pTagCtx[num++] = &pCtx[i]; -// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { -// // tag function may be the group by tag column -// // ts may be the required primary timestamp column -// continue; -// } else { -// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ -// } + // if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { + // tagLen += pCtx[i].resDataInfo.bytes; + // pTagCtx[num++] = &pCtx[i]; + // } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) { + // // tag function may be the group by tag column + // // ts may be the required primary timestamp column + // continue; + // } else { + // // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ + // } } if (p != NULL) { @@ -1124,7 +1126,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, SFuncExecEnv env = {0}; pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; - if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) { + if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) { bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId); if (!isUdaf) { fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); @@ -1883,7 +1885,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO } static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) { if (pFilterNode == NULL) { return; } @@ -2006,8 +2008,9 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_ } } -int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { +int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, + int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t start = pGroupResInfo->index; @@ -2056,11 +2059,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI } else { // expand the result into multiple rows. E.g., _wstartts, top(k, 20) // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - for(int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } } } @@ -2071,14 +2074,16 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI } } - qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId); + qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, + pBlock->info.groupId); blockDataUpdateTsWindow(pBlock, 0); return 0; } -void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { - SExprInfo* pExprInfo = pOperator->pExpr; - int32_t numOfExprs = pOperator->numOfExprs; +void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf) { + SExprInfo* pExprInfo = pOperator->pExpr; + int32_t numOfExprs = pOperator->numOfExprs; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; @@ -2747,10 +2752,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; - idata.info.type = pSchema[i].type; + idata.info.type = pSchema[i].type; idata.info.bytes = pSchema[i].bytes; idata.info.colId = pSchema[i].colId; - idata.hasNull = true; + idata.hasNull = true; taosArrayPush(pBlock->pDataBlock, &idata); if (IS_VAR_DATA_TYPE(idata.info.type)) { @@ -3100,7 +3105,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -SOperatorInfo* createExchangeOperatorInfo(void *pTransporter, const SNodeList* pSources, SSDataBlock* pBlock, +SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3213,7 +3218,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, int32_t rowIndex) { for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index -// pCtx[j].startRow = rowIndex; + // pCtx[j].startRow = rowIndex; } for (int32_t j = 0; j < numOfExpr; ++j) { @@ -3264,7 +3269,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { -// pCtx[i].size = 1; + // pCtx[i].size = 1; } for (int32_t i = 0; i < pBlock->info.rows; ++i) { @@ -3490,10 +3495,11 @@ _error: return NULL; } -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) { +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; @@ -3521,7 +3527,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; + SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; @@ -3860,7 +3866,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); + code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, + pProjectInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -4110,7 +4117,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { - STableKeyInfo* pk = taosArrayGet(pa, j); + STableKeyInfo* pk = taosArrayGet(pa, j); STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; pTQueryInfo->lastKey = pk->lastKey; } @@ -4246,9 +4253,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p goto _error; } - pInfo->limit = *pLimit; - pInfo->slimit = *pSlimit; - pInfo->curOffset = pLimit->offset; + pInfo->limit = *pLimit; + pInfo->slimit = *pSlimit; + pInfo->curOffset = pLimit->offset; pInfo->curSOffset = pSlimit->offset; pInfo->binfo.pRes = pResBlock; @@ -4267,15 +4274,15 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo); - pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); - pOperator->name = "ProjectOperator"; + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); + pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = num; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = num; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); @@ -4394,10 +4401,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa } pCol->slotId = slotId; - pCol->colId = colId; - pCol->bytes = pType->bytes; - pCol->type = pType->type; - pCol->scale = pType->scale; + pCol->colId = colId; + pCol->bytes = pType->bytes; + pCol->type = pType->type; + pCol->scale = pType->scale; pCol->precision = pType->precision; pCol->dataBlockId = blockId; @@ -4472,10 +4479,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) { pFuncNode->pParameterList = nodesMakeList(); ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); - SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); - if (NULL == res) { // todo handle error + SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == res) { // todo handle error } else { - res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; + res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; nodesListAppend(pFuncNode->pParameterList, res); } } @@ -4545,7 +4552,7 @@ static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); -void extractTableSchemaVersion(SReadHandle *pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { +void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); metaGetTableEntryByUid(&mr, uid); @@ -4592,7 +4599,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { - SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. + SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; @@ -4601,8 +4608,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); } else { - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, - queryId, taskId); + doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); } if (pDataReader == NULL && terrno != 0) { @@ -4613,15 +4619,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pDescNode); - SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + SArray* pCols = + extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo, - pScanPhyNode->node.pConditions, pOperatorDumy); + SOperatorInfo* pOperator = + createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, + tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { @@ -4633,7 +4641,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pDescNode); int32_t numOfOutputCols = 0; - SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + SArray* colList = + extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createSysTableScanOperatorInfo( pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); @@ -4655,8 +4664,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num); int32_t numOfOutputCols = 0; - SArray* colList = - extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, + COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo); @@ -4738,7 +4747,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); + SArray* pColList = + extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { @@ -4770,7 +4780,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; - SColumn col = extractColumnFromColumnNode(pColNode); + SColumn col = extractColumnFromColumnNode(pColNode); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; @@ -4838,11 +4848,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { SColumn c = {0}; - c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; - c.scale = pColNode->node.resType.scale; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.scale = pColNode->node.resType.scale; c.precision = pColNode->node.resType.precision; return c; } @@ -5239,15 +5249,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, - const char* pKey, const char* pDir) { +int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, const char* pKey, + const char* pDir) { pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); int32_t pageSize = rowSize * 32; int32_t bufSize = pageSize * 4096; createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);; + pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK); + ; return TSDB_CODE_SUCCESS; } - diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h index 4d45eb91cebced879e75ce23368b7a650d119f2e..21d277665872fc520ecea0fe6157b8338789499b 100644 --- a/source/libs/function/inc/functionMgtInt.h +++ b/source/libs/function/inc/functionMgtInt.h @@ -28,7 +28,7 @@ extern "C" { #define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0) #define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1) -#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2) +#define FUNC_MGT_INDEFINITE_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2) #define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3) #define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4) #define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 3e71888bf9fe3dd8e2d14cd3289f287bbd6494ed..f20e98f59275e037a50ba75009daf64c14394978 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -14,8 +14,8 @@ */ #include "builtins.h" -#include "querynodes.h" #include "builtinsimpl.h" +#include "querynodes.h" #include "scalar.h" #include "taoserror.h" #include "tdatablock.h" @@ -215,7 +215,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - SValueNode* pValue = (SValueNode*) pParamNode; + SValueNode* pValue = (SValueNode*)pParamNode; if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } @@ -336,7 +336,7 @@ static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT }; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } @@ -361,7 +361,7 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32 return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT }; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } @@ -392,7 +392,7 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } } - pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType}; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType}; return TSDB_CODE_SUCCESS; } @@ -434,7 +434,7 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) } SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); - uint8_t colType = pCol->resType.type; + uint8_t colType = pCol->resType.type; if (IS_VAR_DATA_TYPE(colType)) { pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; } else { @@ -463,7 +463,7 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); - uint8_t colType = pCol->resType.type; + uint8_t colType = pCol->resType.type; if (IS_VAR_DATA_TYPE(colType)) { pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; } else { @@ -500,8 +500,7 @@ static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); if (QUERY_NODE_COLUMN != nodeType(pPara)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The parameters of UNIQUE can only be columns"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE can only be columns"); } pFunc->node.resType = ((SExprNode*)pPara)->resType; @@ -823,7 +822,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTop, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, @@ -833,7 +832,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateBottom, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, @@ -915,7 +914,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, @@ -925,7 +924,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "state_count", .type = FUNCTION_TYPE_STATE_COUNT, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateStateCount, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, @@ -935,7 +934,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "state_duration", .type = FUNCTION_TYPE_STATE_DURATION, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateStateDuration, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, @@ -945,7 +944,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "csum", .type = FUNCTION_TYPE_CSUM, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateCsum, .getEnvFunc = getCsumFuncEnv, .initFunc = functionSetup, @@ -955,7 +954,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "mavg", .type = FUNCTION_TYPE_MAVG, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateMavg, .getEnvFunc = getMavgFuncEnv, .initFunc = mavgFunctionSetup, @@ -965,7 +964,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "sample", .type = FUNCTION_TYPE_SAMPLE, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateSample, .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, @@ -975,7 +974,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "tail", .type = FUNCTION_TYPE_TAIL, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateTail, .getEnvFunc = getTailFuncEnv, .initFunc = tailFunctionSetup, @@ -985,7 +984,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "unique", .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateUnique, .getEnvFunc = getUniqueFuncEnv, .initFunc = uniqueFunctionSetup, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 3b1e66f2ad5a4818e8d8b3e502b14a825d66c8e8..4bbefc57c77ea768927f2db290c3b90e0b95a0bb 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -149,6 +149,8 @@ bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MG bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); } +bool fmIsVectorFunc(int32_t funcId) { return !fmIsScalarFunc(funcId); } + bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); } bool fmIsTimelineFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_TIMELINE_FUNC); } @@ -161,7 +163,7 @@ bool fmIsWindowPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc( bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); } -bool fmIsNonstandardSQLFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC); } +bool fmIsIndefiniteRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INDEFINITE_ROWS_FUNC); } bool fmIsSpecialDataRequiredFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0e8f530b0eb8d209f73cf349a4ca8dd590a2e304..ea99f183fbc70f61fb6b14db85aa10cc58a40d84 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -490,6 +490,7 @@ static const char* jkScanLogicPlanScanCols = "ScanCols"; static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols"; static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize"; static const char* jkScanLogicPlanTableMeta = "TableMeta"; +static const char* jkScanLogicPlanTagCond = "TagCond"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -507,6 +508,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond); + } return code; } @@ -528,6 +532,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond); + } return code; } @@ -1775,6 +1782,7 @@ static const char* jkSubplanDbFName = "DbFName"; static const char* jkSubplanNodeAddr = "NodeAddr"; static const char* jkSubplanRootNode = "RootNode"; static const char* jkSubplanDataSink = "DataSink"; +static const char* jkSubplanTagCond = "TagCond"; static int32_t subplanToJson(const void* pObj, SJson* pJson) { const SSubplan* pNode = (const SSubplan*)pObj; @@ -1801,6 +1809,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSubplanDataSink, nodeToJson, pNode->pDataSink); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSubplanTagCond, nodeToJson, pNode->pTagCond); + } return code; } @@ -1831,6 +1842,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkSubplanDataSink, (SNode**)&pNode->pDataSink); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSubplanTagCond, (SNode**)&pNode->pTagCond); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 80c4593d9bb683788953d0410f654b813d0c126a..f93f0218d4537218e3a3fdc07686995d9bb4935c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -342,25 +342,19 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ CHECK_OUT_OF_MEM(cond); cond->condType = type; cond->pParameterList = nodesMakeList(); - if ((QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type != ((SLogicConditionNode*)pParam1)->condType) || - (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type != ((SLogicConditionNode*)pParam2)->condType)) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type == ((SLogicConditionNode*)pParam1)->condType) { + nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList); + ((SLogicConditionNode*)pParam1)->pParameterList = NULL; + nodesDestroyNode(pParam1); + } else { nodesListAppend(cond->pParameterList, pParam1); - nodesListAppend(cond->pParameterList, pParam2); + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type == ((SLogicConditionNode*)pParam2)->condType) { + nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList); + ((SLogicConditionNode*)pParam2)->pParameterList = NULL; + nodesDestroyNode(pParam2); } else { - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1)) { - nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList); - ((SLogicConditionNode*)pParam1)->pParameterList = NULL; - nodesDestroyNode(pParam1); - } else { - nodesListAppend(cond->pParameterList, pParam1); - } - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2)) { - nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList); - ((SLogicConditionNode*)pParam2)->pParameterList = NULL; - nodesDestroyNode(pParam2); - } else { - nodesListAppend(cond->pParameterList, pParam2); - } + nodesListAppend(cond->pParameterList, pParam2); } return (SNode*)cond; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 99e1135599a86961ad9794dad32da6fe627ec2c3..ab7f137b32c174b916493137ecb7274964f67482 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -292,8 +292,8 @@ static bool isScanPseudoColumnFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); } -static bool isNonstandardSQLFunc(const SNode* pNode) { - return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsNonstandardSQLFunc(((SFunctionNode*)pNode)->funcId)); +static bool isIndefiniteRowsFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsIndefiniteRowsFunc(((SFunctionNode*)pNode)->funcId)); } static bool isDistinctOrderBy(STranslateContext* pCxt) { @@ -806,7 +806,7 @@ static EDealRes haveAggOrNonstdFunction(SNode* pNode, void* pContext) { if (isAggFunc(pNode)) { *((bool*)pContext) = true; return DEAL_RES_END; - } else if (isNonstandardSQLFunc(pNode)) { + } else if (isIndefiniteRowsFunc(pNode)) { *((bool*)pContext) = true; return DEAL_RES_END; } @@ -851,6 +851,15 @@ static bool hasInvalidFuncNesting(SNodeList* pParameterList) { return hasInvalidFunc; } +static int32_t getFuncInfo(STranslateContext* pCxt, SFunctionNode* pFunc) { + SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog, + .pRpc = pCxt->pParseCxt->pTransporter, + .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet, + .pErrBuf = pCxt->msgBuf.buf, + .errBufLen = pCxt->msgBuf.len}; + return fmGetFuncInfo(¶m, pFunc); +} + static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { SNode* pParam = NULL; FOREACH(pParam, pFunc->pParameterList) { @@ -859,12 +868,7 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) } } - SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog, - .pRpc = pCxt->pParseCxt->pTransporter, - .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet, - .pErrBuf = pCxt->msgBuf.buf, - .errBufLen = pCxt->msgBuf.len}; - pCxt->errCode = fmGetFuncInfo(¶m, pFunc); + pCxt->errCode = getFuncInfo(pCxt, pFunc); if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsAggFunc(pFunc->funcId)) { if (beforeHaving(pCxt->currClause)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION); @@ -872,7 +876,7 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) if (hasInvalidFuncNesting(pFunc->pParameterList)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING); } - if (pCxt->pCurrStmt->hasNonstdSQLFunc) { + if (pCxt->pCurrStmt->hasIndefiniteRowsFunc) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); } @@ -899,14 +903,15 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) } } } - if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsNonstandardSQLFunc(pFunc->funcId)) { - if (SQL_CLAUSE_SELECT != pCxt->currClause || pCxt->pCurrStmt->hasNonstdSQLFunc || pCxt->pCurrStmt->hasAggFuncs) { + if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsIndefiniteRowsFunc(pFunc->funcId)) { + if (SQL_CLAUSE_SELECT != pCxt->currClause || pCxt->pCurrStmt->hasIndefiniteRowsFunc || + pCxt->pCurrStmt->hasAggFuncs) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); } if (hasInvalidFuncNesting(pFunc->pParameterList)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING); } - pCxt->pCurrStmt->hasNonstdSQLFunc = true; + pCxt->pCurrStmt->hasIndefiniteRowsFunc = true; } return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR; } @@ -990,7 +995,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); if (TSDB_CODE_SUCCESS == pCxt->errCode) { - translateFunction(pCxt, pFunc); + pCxt->errCode == getFuncInfo(pCxt, pFunc); } if (TSDB_CODE_SUCCESS == pCxt->errCode) { *pNode = (SNode*)pFunc; @@ -1060,7 +1065,7 @@ static int32_t checkExprListForGroupBy(STranslateContext* pCxt, SNodeList* pList } static EDealRes rewriteColsToSelectValFuncImpl(SNode** pNode, void* pContext) { - if (isAggFunc(*pNode)) { + if (isAggFunc(*pNode) || isIndefiniteRowsFunc(*pNode)) { return DEAL_RES_IGNORE_CHILD; } if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { @@ -1097,7 +1102,7 @@ static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) { pCxt->existAggFunc = true; return DEAL_RES_IGNORE_CHILD; } - if (isNonstandardSQLFunc(pNode)) { + if (isIndefiniteRowsFunc(pNode)) { pCxt->existNonstdFunc = true; return DEAL_RES_IGNORE_CHILD; } @@ -1939,7 +1944,7 @@ static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType d nodesDestroyNode(pFunc); return TSDB_CODE_OUT_OF_MEMORY; } - if (DEAL_RES_ERROR == translateFunction(pCxt, pFunc)) { + if (TSDB_CODE_SUCCESS != getFuncInfo(pCxt, pFunc)) { nodesClearList(pFunc->pParameterList); pFunc->pParameterList = NULL; nodesDestroyNode(pFunc); @@ -4082,10 +4087,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS } static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) { - if (DEAL_RES_ERROR == translateFunction(pCxt, pFunc)) { - return pCxt->errCode; + int32_t code = getFuncInfo(pCxt, pFunc); + if (TSDB_CODE_SUCCESS == code) { + code = scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal); } - return scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal); + return code; } static int32_t colDataBytesToValueDataBytes(uint8_t type, int32_t bytes) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 6c567fd4ab90729277532b6e95f9d55ba1e787d2..4e77ae5fba32314fafed0de7538056794616c7b1 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -418,7 +418,7 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr) } static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { - if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) { + if (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && NULL == pSelect->pGroupByList) { return TSDB_CODE_SUCCESS; } @@ -442,8 +442,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); } - if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) { - code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs); + if (TSDB_CODE_SUCCESS == code && (pSelect->hasAggFuncs || pSelect->hasIndefiniteRowsFunc)) { + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsVectorFunc, &pAgg->pAggFuncs); } // rewrite the expression in subsequent clauses diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8645225c04bc82e1ffa9c36db0ab482a8dd6b5a3..6ee931ccffcf2708af4cac28d8c56db18c7d5ae2 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -313,22 +313,53 @@ static EDealRes cpdIsPrimaryKeyCondImpl(SNode* pNode, void* pContext) { } static bool cpdIsPrimaryKeyCond(SNode* pNode) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) { + return false; + } bool isPrimaryKeyCond = false; nodesWalkExpr(pNode, cpdIsPrimaryKeyCondImpl, &isPrimaryKeyCond); return isPrimaryKeyCond; } -static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) { +static EDealRes cpdIsTagCondImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + *((bool*)pContext) = ((COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType) ? true : false); + return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + +static bool cpdIsTagCond(SNode* pNode) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) { + return false; + } + bool isTagCond = false; + nodesWalkExpr(pNode, cpdIsTagCondImpl, &isTagCond); + return isTagCond; +} + +static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pTagCond, + SNode** pOtherCond) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pScan->node.pConditions; + if (LOGIC_COND_TYPE_AND != pLogicCond->condType) { + *pPrimaryKeyCond = NULL; + *pOtherCond = pScan->node.pConditions; + pScan->node.pConditions = NULL; + return TSDB_CODE_SUCCESS; + } + int32_t code = TSDB_CODE_SUCCESS; SNodeList* pPrimaryKeyConds = NULL; + SNodeList* pTagConds = NULL; SNodeList* pOtherConds = NULL; SNode* pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { if (cpdIsPrimaryKeyCond(pCond)) { code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); + } else if (cpdIsTagCond(pScan->node.pConditions)) { + code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond)); } else { code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond)); } @@ -338,37 +369,46 @@ static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimary } SNode* pTempPrimaryKeyCond = NULL; + SNode* pTempTagCond = NULL; SNode* pTempOtherCond = NULL; if (TSDB_CODE_SUCCESS == code) { code = cpdMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds); } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMergeConds(&pTempTagCond, &pTagConds); + } if (TSDB_CODE_SUCCESS == code) { code = cpdMergeConds(&pTempOtherCond, &pOtherConds); } if (TSDB_CODE_SUCCESS == code) { *pPrimaryKeyCond = pTempPrimaryKeyCond; + *pTagCond = pTempTagCond; *pOtherCond = pTempOtherCond; nodesDestroyNode(pScan->node.pConditions); pScan->node.pConditions = NULL; } else { nodesDestroyList(pPrimaryKeyConds); + nodesDestroyList(pTagConds); nodesDestroyList(pOtherConds); nodesDestroyNode(pTempPrimaryKeyCond); + nodesDestroyNode(pTempTagCond); nodesDestroyNode(pTempOtherCond); } return code; } -static int32_t cpdPartitionScanCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) { - if (QUERY_NODE_LOGIC_CONDITION == nodeType(pScan->node.pConditions) && - LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)pScan->node.pConditions)->condType) { - return cpdPartitionScanLogicCond(pScan, pPrimaryKeyCond, pOtherCond); +static int32_t cpdPartitionScanCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pTagCond, + SNode** pOtherCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pScan->node.pConditions)) { + return cpdPartitionScanLogicCond(pScan, pPrimaryKeyCond, pTagCond, pOtherCond); } if (cpdIsPrimaryKeyCond(pScan->node.pConditions)) { *pPrimaryKeyCond = pScan->node.pConditions; + } else if (cpdIsTagCond(pScan->node.pConditions)) { + *pTagCond = pScan->node.pConditions; } else { *pOtherCond = pScan->node.pConditions; } @@ -391,6 +431,36 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, return code; } +typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus; + +static SIdxFltStatus idxGetFltStatus(SNode* pFilterNode) { return SFLT_ACCURATE_INDEX; } + +static int32_t cpdApplyTagIndex(SScanLogicNode* pScan, SNode** pTagCond, SNode** pOtherCond) { + int32_t code = TSDB_CODE_SUCCESS; + SIdxFltStatus idxStatus = idxGetFltStatus(*pTagCond); + switch (idxStatus) { + case SFLT_NOT_INDEX: + code = cpdCondAppend(pOtherCond, pTagCond); + break; + case SFLT_COARSE_INDEX: + pScan->pTagCond = nodesCloneNode(*pTagCond); + if (NULL == pScan->pTagCond) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + code = cpdCondAppend(pOtherCond, pTagCond); + break; + case SFLT_ACCURATE_INDEX: + pScan->pTagCond = *pTagCond; + *pTagCond = NULL; + break; + default: + code = TSDB_CODE_FAILED; + break; + } + return code; +} + static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) { if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) || TSDB_SYSTEM_TABLE == pScan->pMeta->tableType) { @@ -398,11 +468,15 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* } SNode* pPrimaryKeyCond = NULL; + SNode* pTagCond = NULL; SNode* pOtherCond = NULL; - int32_t code = cpdPartitionScanCond(pScan, &pPrimaryKeyCond, &pOtherCond); + int32_t code = cpdPartitionScanCond(pScan, &pPrimaryKeyCond, &pTagCond, &pOtherCond); if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) { code = cpdCalcTimeRange(pScan, &pPrimaryKeyCond, &pOtherCond); } + if (TSDB_CODE_SUCCESS == code && NULL != pTagCond) { + code = cpdApplyTagIndex(pScan, &pTagCond, &pOtherCond); + } if (TSDB_CODE_SUCCESS == code) { pScan->node.pConditions = pOtherCond; } @@ -618,30 +692,6 @@ static bool cpdContainPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { } } -// static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) { -// if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) { -// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); -// } -// return TSDB_CODE_SUCCESS; -// } - -// static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) { -// if (LOGIC_COND_TYPE_AND != pOnCond->condType) { -// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); -// } -// bool hasPrimaryKeyEqualCond = false; -// SNode* pCond = NULL; -// FOREACH(pCond, pOnCond->pParameterList) { -// if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) { -// hasPrimaryKeyEqualCond = true; -// } -// } -// if (!hasPrimaryKeyEqualCond) { -// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); -// } -// return TSDB_CODE_SUCCESS; -// } - static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pOnConditions) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); @@ -650,11 +700,6 @@ static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); } return TSDB_CODE_SUCCESS; - // if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) { - // return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions); - // } else { - // return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions); - // } } static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index affe9ef2f61f0afce95ba7f1feec5f65f23a7f1f..618820640a519689bea65013d1b1d375dc6787fe 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -411,7 +411,7 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys return sortScanCols(pScanPhysiNode->pScanCols); } -static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, +static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) { int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols); if (TSDB_CODE_SUCCESS == code) { @@ -438,6 +438,12 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName)); + if (NULL != pScanLogicNode->pTagCond) { + pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond); + if (NULL == pSubplan->pTagCond) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } } if (TSDB_CODE_SUCCESS == code) { @@ -463,7 +469,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla } vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); - return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode); + return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode); } static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, @@ -498,7 +504,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->intervalUnit = pScanLogicNode->intervalUnit; pTableScan->slidingUnit = pScanLogicNode->slidingUnit; - return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); + return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); } static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, @@ -522,7 +528,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); - return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); + return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); } static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 8e6c04bb337b900b1ebc8b33c73b29d111231ab3..af62c52a89baa90aaf857fa6606267a437275f87 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -18,6 +18,13 @@ #include "planInt.h" #include "scalar.h" +static void dumpQueryPlan(SQueryPlan* pPlan) { + char* pStr = NULL; + nodesNodeToString(pPlan, false, &pStr, NULL); + planDebugL("Query Plan: %s", pStr); + taosMemoryFree(pStr); +} + int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) { SLogicNode* pLogicNode = NULL; SLogicSubplan* pLogicSubplan = NULL; @@ -36,6 +43,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo if (TSDB_CODE_SUCCESS == code) { code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList); } + if (TSDB_CODE_SUCCESS == code) { + dumpQueryPlan(*pPlan); + } nodesDestroyNode(pLogicNode); nodesDestroyNode(pLogicSubplan); diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 77f9b5846c1edbe879c21d90c978a01232c2444e..4234a1320a433da1184cdca6d5f567fa3a2005c6 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -32,6 +32,12 @@ TEST_F(PlanOptimizeTest, optimizeScanData) { run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1"); } +TEST_F(PlanOptimizeTest, ConditionPushDown) { + useDb("root", "test"); + + run("SELECT ts, c1 FROM st1 WHERE tag1 > 4"); +} + TEST_F(PlanOptimizeTest, orderByPrimaryKey) { useDb("root", "test"); diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 6e184fec724ceffbaa186367956d1318c59d2fb9..c0f1f2127474bc0b95caa35741266060df35690a 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -232,45 +232,45 @@ class PlannerTestBaseImpl { if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { if (res_.prepareAst_.empty()) { - cout << "syntax tree : " << endl; + cout << "+++++++++++++++++++++syntax tree : " << endl; cout << res_.ast_ << endl; } else { - cout << "prepare syntax tree : " << endl; + cout << "+++++++++++++++++++++prepare syntax tree : " << endl; cout << res_.prepareAst_ << endl; - cout << "bound syntax tree : " << endl; + cout << "+++++++++++++++++++++bound syntax tree : " << endl; cout << res_.boundAst_ << endl; - cout << "syntax tree : " << endl; + cout << "+++++++++++++++++++++syntax tree : " << endl; cout << res_.ast_ << endl; } } if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) { - cout << "raw logic plan : " << endl; + cout << "+++++++++++++++++++++raw logic plan : " << endl; cout << res_.rawLogicPlan_ << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_OPTIMIZED == module) { - cout << "optimized logic plan : " << endl; + cout << "+++++++++++++++++++++optimized logic plan : " << endl; cout << res_.optimizedLogicPlan_ << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_SPLIT == module) { - cout << "split logic plan : " << endl; + cout << "+++++++++++++++++++++split logic plan : " << endl; cout << res_.splitLogicPlan_ << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_SCALED == module) { - cout << "scaled logic plan : " << endl; + cout << "+++++++++++++++++++++scaled logic plan : " << endl; cout << res_.scaledLogicPlan_ << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) { - cout << "physical plan : " << endl; + cout << "+++++++++++++++++++++physical plan : " << endl; cout << res_.physiPlan_ << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) { - cout << "physical subplan : " << endl; + cout << "+++++++++++++++++++++physical subplan : " << endl; for (const auto& subplan : res_.physiSubplans_) { cout << subplan << endl; }