From 88c767bcdbd4a50e84e97f84dacd0a061b8ce4e6 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 10 May 2022 08:55:32 +0800 Subject: [PATCH] fix: problem of parser and planner --- source/libs/executor/src/executorimpl.c | 131 +++++++++++++----------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1327ddb48e..2ca0cc1d7c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -203,9 +203,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); -// if (!pDescNode->output) { // todo disable it temporarily -// continue; -// } + // if (!pDescNode->output) { // todo disable it temporarily + // continue; + // } idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; @@ -833,7 +833,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0], pfCtx->input.numOfRows); + colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0], + pfCtx->input.numOfRows); } else { colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); } @@ -1873,7 +1874,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) } void initResultRow(SResultRow* pResultRow) { -// pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); + // pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); } /* @@ -1885,7 +1886,8 @@ void initResultRow(SResultRow* pResultRow) { * offset[0] offset[1] offset[2] */ // TODO refactor: some function move away -void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) { +void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, + SExecTaskInfo* pTaskInfo) { SqlFunctionCtx* pCtx = pInfo->pCtx; SSDataBlock* pDataBlock = pInfo->pRes; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; @@ -2240,12 +2242,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased return 0; } -void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf) { +void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, + SDiskbasedBuf* pBuf) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); - int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; - SSDataBlock* pBlock = pbInfo->pRes; - SqlFunctionCtx* pCtx = pbInfo->pCtx; + int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; + SSDataBlock* pBlock = pbInfo->pRes; + SqlFunctionCtx* pCtx = pbInfo->pCtx; blockDataCleanup(pBlock); if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { @@ -2419,7 +2422,8 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; if (pSummary->pRecoder != NULL) { - qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 " us, total blocks:%d, " + qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 + " us, total blocks:%d, " "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows); @@ -2954,7 +2958,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI relocateColumnData(pRes, pColList, pBlock->pDataBlock); taosArrayDestroy(pBlock->pDataBlock); taosMemoryFree(pBlock); -// blockDataDestroy(pBlock); + // blockDataDestroy(pBlock); } pRes->info.rows = numOfRows; @@ -4099,8 +4103,8 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { } // todo handle different group data interpolation - bool n = false; - bool *newgroup = &n; + bool n = false; + bool* newgroup = &n; doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) { return pResBlock; @@ -4224,7 +4228,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n } uint32_t defaultPgsz = 4096; - while(defaultPgsz < pAggSup->resultRowSize*4) { + while (defaultPgsz < pAggSup->resultRowSize * 4) { defaultPgsz <<= 1u; } @@ -4375,7 +4379,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { } void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { - SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*) param; + SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; } void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { @@ -4487,8 +4491,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t } SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* pValueNode, - bool multigroupResult, SExecTaskInfo* pTaskInfo) { + SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, + int32_t fillType, SNodeListNode* pValueNode, bool multigroupResult, + SExecTaskInfo* pTaskInfo) { SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -4522,8 +4527,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SResultInfo* pResultInfo = &pOperator->resultInfo; initResultSizeInfo(pOperator, 4096); - int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity, - pTaskInfo->id.str, pInterval, type); + int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity, pTaskInfo->id.str, + pInterval, type); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4736,7 +4741,8 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t uint64_t queryId, uint64_t taskId); static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractColumnInfo(SNodeList* pNodeList); -static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); +static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type); static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); @@ -4776,15 +4782,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pDescNode); SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { return NULL; } - SInterval interval = extractIntervalInfo(pTableScanNode); - SOperatorInfo* pOperator = createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, - pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions, - &interval, pTableScanNode->ratio, pTaskInfo); + SInterval interval = extractIntervalInfo(pTableScanNode); + SOperatorInfo* pOperator = createTableScanOperatorInfo( + pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock, + pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; @@ -4795,14 +4801,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, + queryId, taskId); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SSDataBlock* pResBlock = createResDataBlock(pDescNode); - int32_t numOfCols = 0; - SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t numOfCols = 0; + SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo, pScanPhyNode->node.pConditions); taosArrayDestroy(tableIdList); @@ -4822,25 +4829,27 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { - STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode; + STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; SSDataBlock* pResBlock = createResDataBlock(pDescNode); - int32_t code = - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, + queryId, taskId); if (code != TSDB_CODE_SUCCESS) { return NULL; } - int32_t num = 0; + int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num); int32_t numOfOutputCols = 0; - SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID); + SArray* colList = + extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID); - SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo); + SOperatorInfo* pOperator = + createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo); return pOperator; } else { ASSERT(0); @@ -4887,7 +4896,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pTaskInfo, pTableGroupInfo); } - } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); @@ -4925,13 +4934,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType}; + STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, + .calTrigger = pSessionNode->window.triggerType}; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); + pOptr = + createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); @@ -4957,11 +4968,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num); SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval; - pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo); + pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, + (SNodeListNode*)pFillNode->pValues, false, pTaskInfo); } else { ASSERT(0); } @@ -5101,7 +5113,8 @@ SArray* createSortInfo(SNodeList* pNodeList) { return pList; } -SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type) { +SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type) { size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); if (pList == NULL) { @@ -5114,10 +5127,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; SColMatchInfo c = {0}; - c.output = true; - c.colId = pColNode->colId; - c.srcSlotId = pColNode->slotId; - c.matchType = type; + c.output = true; + c.colId = pColNode->colId; + c.srcSlotId = pColNode->slotId; + c.matchType = type; c.targetSlotId = pNode->slotId; taosArrayPush(pList, &c); } @@ -5527,8 +5540,8 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { } SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, - int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, - SExecTaskInfo* pTaskInfo) { + int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, + SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { @@ -5537,14 +5550,14 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t initResultSizeInfo(pOperator, 4096); - pInfo->pRes = pResBlock; - pOperator->name = "MergeJoinOperator"; + pInfo->pRes = pResBlock; + pOperator->name = "MergeJoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; SOperatorNode* pNode = (SOperatorNode*)pOnCondition; @@ -5568,9 +5581,9 @@ _error: } void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { - pColumn->slotId = pColumnNode->slotId; - pColumn->type = pColumnNode->node.resType.type; - pColumn->bytes = pColumnNode->node.resType.bytes; + pColumn->slotId = pColumnNode->slotId; + pColumn->type = pColumnNode->node.resType.type; + pColumn->bytes = pColumnNode->node.resType.bytes; pColumn->precision = pColumnNode->node.resType.precision; - pColumn->scale = pColumnNode->node.resType.scale; + pColumn->scale = pColumnNode->node.resType.scale; } -- GitLab