diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e382fa4efd4eea7f22cf41da92ec751bd7a2fa4a..ba16acf7b00e6e485262c7ae372f5c1977c9cea9 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -104,6 +104,7 @@ typedef struct SJoinLogicNode { SNode* pMergeCondition; SNode* pOnConditions; bool isSingleTableJoin; + EOrder inputTsOrder; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -201,6 +202,7 @@ typedef struct SWindowLogicNode { int64_t watermark; int8_t igExpired; EWindowAlgorithm windowAlgo; + EOrder inputTsOrder; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; -typedef struct SJoinPhysiNode { +typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; SNode* pMergeCondition; SNode* pOnConditions; SNodeList* pTargets; -} SJoinPhysiNode; - -typedef SJoinPhysiNode SSortMergeJoinPhysiNode; + EOrder inputTsOrder; +} SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { SPhysiNode node; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 9ffdfc2289c6865a246d2782a121339dd095da57..266f96b41e89264e7813a47cdde1932ae7c48a79 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; pPhysiChildren = pJoinNode->node.pChildren; break; } @@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, - QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT : EXPLAIN_TBL_SCAN_FORMAT, + QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT + : EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { @@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); if (pSTblScanNode->scan.pScanPseudoCols) { EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { @@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); if (pDistScanNode->pScanPseudoCols) { EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - + EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } break; - } + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_QRY_APP_ERROR; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1ad17bbc76644686a6722f90760035107b830918..44394bb2f0f48cd0b2ec8561294a94bf7fcb2753 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -321,6 +321,49 @@ typedef struct STableScanInfo { int8_t noTable; } STableScanInfo; +typedef struct STableMergeScanInfo { + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + + SFileBlockLoadRecorder readRecorder; + int64_t numOfRows; + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowEntryInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; + + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SqlFunctionCtx* pPseudoCtx; + + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + // if the upstream is an interval operator, the interval info is also kept here to get the time + // window to check if current data block needs to be loaded. + SInterval interval; + SSampleExecInfo sample; // sample execution info +} STableMergeScanInfo; + typedef struct STagScanInfo { SColumnInfo *pCols; SSDataBlock *pRes; @@ -881,7 +924,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7bac828a537755e67eb002e3326688d275fb3cb6..7194b16a78cabc5336c6b3402f26c58c6834b732 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1356,7 +1356,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); if (pColMatchInfo != NULL) { - for(int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); @@ -2885,11 +2885,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; - } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; *order = pTableScanInfo->cond.order; *scanFlag = pTableScanInfo->scanFlag; return TSDB_CODE_SUCCESS; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + STableMergeScanInfo* pTableScanInfo = pOperator->info; + *order = pTableScanInfo->cond.order; + *scanFlag = pTableScanInfo->scanFlag; + return TSDB_CODE_SUCCESS; } else { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { return TSDB_CODE_INVALID_PARA; @@ -3307,9 +3312,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } SOperatorInfo* downstream = pOperator->pDownstream[0]; - SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; + SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; - while(1) { + while (1) { while (1) { blockDataCleanup(pRes); @@ -3326,7 +3331,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } if (pLimitInfo->remainGroupOffset > 0) { - if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group + if (pLimitInfo->currentGroupId == 0 || + pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.groupId; continue; } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { @@ -4265,7 +4271,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { } // this the tags and pseudo function columns, we only keep the tag columns - for(int32_t i = 0; i < numOfTags; ++i) { + for (int32_t i = 0; i < numOfTags; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i); int32_t type = nodeType(pNode->pExpr); @@ -4381,7 +4387,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, int32_t groupNum = 0; for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); + int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4701,7 +4707,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { - pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo); + pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 2e6c9bd351b5c5b9bdff21ca6d44f4d6520e0eab..f26b2f4f0a0c9e9a4fd33111bd689de82b055a36 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); - int32_t numOfCols = 0; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pRes = pResBlock; - pOperator->name = "MergeJoinOperator"; + pInfo->pRes = pResBlock; + pOperator->name = "MergeJoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; SNode* pMergeCondition = pJoinNode->pMergeCondition; if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { @@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; nodesDestroyNode(pJoinOperator->pCondAfterMerge); - + taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a9d03aebbe365f4c4e25129cdeb1b2c02f51c505..539ef18d87158f196b13c5310e697626665066b6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); } else { - qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st)); + qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); } return TSDB_CODE_SUCCESS; @@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type; pColInfoData = taosArrayGet(p->pDataBlock, 4); char tagTypeStr[VARSTR_HEADER_SIZE + 32]; - int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); + int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); if (tagType == TSDB_DATA_TYPE_VARCHAR) { - tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); + tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", + (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); } else if (tagType == TSDB_DATA_TYPE_NCHAR) { - tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); + tagTypeLen += + sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", + (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(tagTypeStr, tagTypeLen); colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false); @@ -2527,49 +2530,6 @@ _error: return NULL; } -typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - - SFileBlockLoadRecorder readRecorder; - int64_t numOfRows; - SScanInfo scanInfo; - int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer - SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context - SResultRowInfo* pResultRowInfo; - int32_t* rowEntryInfoOffset; - SExprInfo* pExpr; - SSDataBlock* pResBlock; - SArray* pColMatchInfo; - int32_t numOfOutput; - - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SqlFunctionCtx* pPseudoCtx; - - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - // if the upstream is an interval operator, the interval info is also kept here to get the time - // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info -} STableMergeScanInfo; - int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { @@ -2975,6 +2935,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pTableScanInfo->pSortInfo); + taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset); taosMemoryFreeClear(param); } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index eec4780293bdcad4ec3a81ad18b1492ef2a4a849..186a51f0001911ef1b0144e404c6dbdc6b7af89c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { - const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj; + const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { @@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { } static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { - SJoinPhysiNode* pNode = (SJoinPhysiNode*)pObj; + SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index b12e3b14c70d0ba471baf2172c16c6c7b5a617bc..77681af1bc2320fabefd75c326ca3f52123eaeff 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; + SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 23f0bb088d1aff274adc3fb4607a9acc13d784e7..3c6fbe409c6be5d40b0686376e4a0ec87c9e275e 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - return makeNode(type, sizeof(SJoinPhysiNode)); + return makeNode(type, sizeof(SSortMergeJoinPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return makeNode(type, sizeof(SAggPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) { break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; + SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 702422e022b5292d5829e09c221f77ef6ccaddc0..d564d536335db100d282230c7031d807c28f1f41 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) { +static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, + SArray* tagName, uint8_t tagNum) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; pTbReq->ctb.tagNum = tagNum; - if(sname) pTbReq->ctb.name = strdup(sname); + if (sname) pTbReq->ctb.name = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->commentLen = -1; @@ -969,7 +970,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint goto end; } - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, + pCxt->pTableMeta->tableInfo.numOfTags); end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { @@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache)); CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); @@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols return ret; } - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, + pTableMeta->tableInfo.numOfTags); taosArrayDestroy(tagName); smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 74d5f03dc120a91dbb049733a1aab3605a22e581..7c9a8b10dd206d047154b5e1df5d02d119c814c6 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG: return "sliding value no larger than the interval value"; case TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL: - return "sliding value can not less than 1% of interval value"; + return "sliding value can not less than 1%% of interval value"; case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG: return "Only one tag if there is a json tag"; case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 3b545777a814df7c2eadbf4426143de68efe07b9..fcc395af62b1e05d55707e326dc308bb1b5c9dff 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -993,22 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) { } static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) { - int32_t code = TSDB_CODE_SUCCESS; - switch (nodeType(pNode)) { - case QUERY_NODE_LOGIC_PLAN_SCAN: + case QUERY_NODE_LOGIC_PLAN_SCAN: { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (NULL != pScan->pGroupTags) { + *pNotOptimize = true; + return TSDB_CODE_SUCCESS; + } return nodesListMakeAppend(pScanNodes, (SNode*)pNode); - case QUERY_NODE_LOGIC_PLAN_JOIN: - code = + } + case QUERY_NODE_LOGIC_PLAN_JOIN: { + int32_t code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); if (TSDB_CODE_SUCCESS == code) { code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes); } return code; + } case QUERY_NODE_LOGIC_PLAN_AGG: + case QUERY_NODE_LOGIC_PLAN_PARTITION: *pNotOptimize = true; - return code; + return TSDB_CODE_SUCCESS; default: break; } @@ -1034,6 +1040,18 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) { return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order; } +static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) { + if (NULL == pNode) { + return; + } + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) { + ((SWindowLogicNode*)pNode)->inputTsOrder = order; + } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) { + ((SJoinLogicNode*)pNode)->inputTsOrder = order; + } + sortPriKeyOptSetParentOrder(pNode->pParent, order); +} + static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort, SNodeList* pScanNodes) { EOrder order = sortPriKeyOptGetPriKeyOrder(pSort); @@ -1048,6 +1066,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; } + sortPriKeyOptSetParentOrder(pScan->node.pParent, order); } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0); @@ -1613,10 +1632,10 @@ static void alignProjectionWithTarget(SLogicNode* pNode) { } SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode; - SNode* pProjection = NULL; + SNode* pProjection = NULL; FOREACH(pProjection, pProjectNode->pProjections) { SNode* pTarget = NULL; - bool keep = false; + bool keep = false; FOREACH(pTarget, pNode->pTargets) { if (0 == strcmp(((SColumnNode*)pProjection)->node.aliasName, ((SColumnNode*)pTarget)->colName)) { keep = true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index fd359fce45c312a90f3a47ea787df7ce6a00b1ca..38f3c23925321ec74a3b7c6a1de5a9606f9768e9 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -621,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { - SJoinPhysiNode* pJoin = - (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); + SSortMergeJoinPhysiNode* pJoin = + (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 879c04743dcfcaf941adcb5f1e4b5156f20eaa24..81e2bff1799af7989f2c7b898d05a97ff4b852f3 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent return code; } -static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) { +static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) { SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); if (NULL == pMergeKey) { return TSDB_CODE_OUT_OF_MEMORY; @@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** nodesDestroyNode((SNode*)pMergeKey); return TSDB_CODE_OUT_OF_MEMORY; } - pMergeKey->order = ORDER_ASC; + pMergeKey->order = order; pMergeKey->nullOrder = NULL_ORDER_FIRST; return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey); } @@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE; SNodeList* pMergeKeys = NULL; - code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, + ((SWindowLogicNode*)pInfo->pSplitNode)->inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); } @@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0); SNodeList* pMergeKeys = NULL; - int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); + int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, + ((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true); @@ -950,7 +952,8 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; pMergeScan->node.pChildren = pChildren; splSetParent((SLogicNode*)pMergeScan); - code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), &pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), + pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys); } if (TSDB_CODE_SUCCESS == code) { @@ -1020,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) } static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) { - SNode* pPrimaryKey = - nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0))); + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0); + SNode* pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan)); if (NULL == pPrimaryKey) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys); } return code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 80ad480e43aca9d42e4f755d23e1730aae5514de..ad6eff3c122af7b831ecb3b15157bff54dadc41c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG, "Interval offset should be shorter than interval") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_UNIT, "Does not support sliding when interval is natural month/year") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG, "sliding value no larger than the interval value") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1% of interval value") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1%% of interval value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_ONLY_ONE_JSON_TAG, "Only one tag if there is a json tag") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_NUM_OF_COL, "Query block has incorrect number of result columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL, "Incorrect TIMESTAMP value")